From 47b19f036eee81f5ab1ab07d89d4eb4e72583a75 Mon Sep 17 00:00:00 2001 From: streppel Date: Mon, 13 Sep 2021 17:58:55 -0300 Subject: [PATCH] fix: stopping scheduler waits for all jobs to finish (#238) * fix: stopping scheduler waits for all jobs to finish * fixing race condition * removing runningJobsWg from executor's state * rearranging test assertion * Update scheduler_test.go Co-authored-by: John Roesler * test uses atomic int Co-authored-by: John Roesler --- executor.go | 20 +++++++++++++------- scheduler.go | 5 +++-- scheduler_test.go | 15 +++++++++++++++ 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/executor.go b/executor.go index d815ad0a..e1f46ee8 100644 --- a/executor.go +++ b/executor.go @@ -26,7 +26,7 @@ const ( type executor struct { jobFunctions chan jobFunction - stop chan struct{} + stopCh chan struct{} limitMode limitMode maxRunningJobs *semaphore.Weighted } @@ -34,20 +34,20 @@ type executor struct { func newExecutor() executor { return executor{ jobFunctions: make(chan jobFunction, 1), - stop: make(chan struct{}, 1), + stopCh: make(chan struct{}, 1), } } func (e *executor) start() { - wg := sync.WaitGroup{} stopCtx, cancel := context.WithCancel(context.Background()) + runningJobsWg := sync.WaitGroup{} for { select { case f := <-e.jobFunctions: - wg.Add(1) + runningJobsWg.Add(1) go func() { - defer wg.Done() + defer runningJobsWg.Done() if e.maxRunningJobs != nil { if !e.maxRunningJobs.TryAcquire(1) { @@ -92,10 +92,16 @@ func (e *executor) start() { }) } }() - case <-e.stop: + case <-e.stopCh: cancel() - wg.Wait() + runningJobsWg.Wait() + e.stopCh <- struct{}{} return } } } + +func (e *executor) stop() { + e.stopCh <- struct{}{} + <-e.stopCh +} diff --git a/scheduler.go b/scheduler.go index 9ad8bb02..1df705e0 100644 --- a/scheduler.go +++ b/scheduler.go @@ -689,7 +689,8 @@ func (s *Scheduler) Clear() { } } -// Stop stops the scheduler. This is a no-op if the scheduler is already stopped . +// Stop stops the scheduler. This is a no-op if the scheduler is already stopped. +// It waits for all running jobs to finish before returning, so it is safe to assume that running jobs will finish when calling this. func (s *Scheduler) Stop() { if s.IsRunning() { s.stop() @@ -698,7 +699,7 @@ func (s *Scheduler) Stop() { func (s *Scheduler) stop() { s.setRunning(false) - s.executor.stop <- struct{}{} + s.executor.stop() } // Do specifies the jobFunc that should be called every time the Job runs diff --git a/scheduler_test.go b/scheduler_test.go index a36656a8..de7feda9 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -631,6 +631,21 @@ func TestScheduler_Stop(t *testing.T) { s.Stop() assert.False(t, s.IsRunning()) }) + t.Run("waits for jobs to finish processing before returning .Stop()", func(t *testing.T) { + t.Parallel() + i := int32(0) + + s := NewScheduler(time.UTC) + s.Every(10).Second().Do(func() { + time.Sleep(2 * time.Second) + atomic.AddInt32(&i, 1) + }) + s.StartAsync() + time.Sleep(1 * time.Second) // enough time for job to run + s.Stop() + + assert.EqualValues(t, 1, atomic.LoadInt32(&i)) + }) } func TestScheduler_StartAt(t *testing.T) {