diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 10e0e8751173e..cd3c1208f1f37 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -143,9 +143,13 @@ func (m *JobManager) jobLoop() error { } cancel() case <-updateScanTaskStateTicker: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-m.notifyStateCh: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-jobCheckTicker: m.checkFinishedJob(se, now) m.checkNotOwnJob() @@ -263,7 +267,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w return workers, nil, nil } -func (m *JobManager) updateTaskState() { +// updateTaskState polls the result from scan worker and returns whether there are result polled +func (m *JobManager) updateTaskState() bool { results := m.pollScanWorkerResults() for _, result := range results { job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) @@ -276,6 +281,8 @@ func (m *JobManager) updateTaskState() { job.finishedScanTaskCounter += 1 job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) } + + return len(results) > 0 } func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {