Skip to content

Commit

Permalink
Fix data race in lastRun (#359)
Browse files Browse the repository at this point in the history
* Fix data race in lastRun

* Use getAtTime in doCommon
  • Loading branch information
seunghyupoh3517 authored Jul 25, 2022
1 parent cf9e6c3 commit 2764428
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 29 deletions.
13 changes: 2 additions & 11 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func (j *Job) getInterval() int {
}

func (j *Job) neverRan() bool {
return j.lastRun.IsZero()
jobLastRun := j.LastRun()
return jobLastRun.IsZero()
}

func (j *Job) getStartsImmediately() bool {
Expand Down Expand Up @@ -415,8 +416,6 @@ func (j *Job) LastRun() time.Time {
}

func (j *Job) setLastRun(t time.Time) {
j.mu.Lock()
defer j.mu.Unlock()
j.lastRun = t
}

Expand All @@ -435,17 +434,9 @@ func (j *Job) setNextRun(t time.Time) {

// RunCount returns the number of time the job ran so far
func (j *Job) RunCount() int {
j.mu.RLock()
defer j.mu.RUnlock()
return j.runCount
}

func (j *Job) incrementRunCount() {
j.mu.Lock()
defer j.mu.Unlock()
j.runCount++
}

func (j *Job) stop() {
j.mu.Lock()
defer j.mu.Unlock()
Expand Down
26 changes: 8 additions & 18 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func (s *Scheduler) calculateDuration(job *Job) time.Duration {
}

func shouldRunAtSpecificTime(job *Job) bool {
return job.getAtTime(job.lastRun) != 0
jobLastRun := job.LastRun()
return job.getAtTime(jobLastRun) != 0
}

func (s *Scheduler) remainingDaysToWeekday(lastRun time.Time, job *Job) int {
Expand Down Expand Up @@ -532,21 +533,6 @@ func (s *Scheduler) run(job *Job) {
return
}

job = s.addJobDetails(job)
if job.error != nil {
// delete the job from the scheduler as this job
// cannot be executed
s.RemoveByReference(job)
return
// return job.error
}

s.executor.jobFunctions <- job.jobFunction.copy()
job.setLastRun(s.now())
job.incrementRunCount()
}

func (s *Scheduler) addJobDetails(job *Job) *Job {
job.mu.Lock()
defer job.mu.Unlock()

Expand All @@ -559,10 +545,13 @@ func (s *Scheduler) addJobDetails(job *Job) *Job {
default:
// something is really wrong and we should never get here
job.error = wrapOrError(job.error, ErrInvalidFunctionParameters)
return
}
}

return job
s.executor.jobFunctions <- job.jobFunction.copy()
job.setLastRun(s.now())
job.runCount++
}

func (s *Scheduler) runContinuous(job *Job) {
Expand Down Expand Up @@ -830,7 +819,8 @@ func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, e
job := s.getCurrentJob()

jobUnit := job.getUnit()
if job.getAtTime(job.lastRun) != 0 && (jobUnit <= hours || jobUnit >= duration) {
jobLastRun := job.LastRun()
if job.getAtTime(jobLastRun) != 0 && (jobUnit <= hours || jobUnit >= duration) {
job.error = wrapOrError(job.error, ErrAtTimeNotSupported)
}

Expand Down

0 comments on commit 2764428

Please sign in to comment.