Skip to content

Commit

Permalink
ttl: periodically update state for a job in heartbeat (pingcap#39939)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao committed Dec 20, 2022
1 parent 4dfc339 commit 11ceffd
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 32 deletions.
1 change: 1 addition & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//types",
"//util",
"//util/chunk",
"//util/hack",
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
Expand Down
51 changes: 44 additions & 7 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package ttlworker

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -83,11 +85,11 @@ func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status
}

func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
jsonStatistics, err := job.statistics.MarshalJSON()
summary, err := job.summary()
if err != nil {
return err
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}
_, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, string(jsonStatistics), job.ownerID))
_, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, summary, job.ownerID))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -107,13 +109,13 @@ func (job *ttlJob) nextScanTask() {

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time) {
summary := job.statistics.String()
if job.scanTaskErr != nil {
summary = fmt.Sprintf("Scan Error: %s, Statistics: %s", job.scanTaskErr.Error(), summary)
summary, err := job.summary()
if err != nil {
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
_, err := se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id))
_, err = se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id))
if err != nil {
logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
}
Expand Down Expand Up @@ -168,3 +170,38 @@ func findJobWithTableID(jobs []*ttlJob, id int64) *ttlJob {

return nil
}

type ttlSummary struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`

TotalScanTask int `json:"total_scan_task"`
ScheduledScanTask int `json:"scheduled_scan_task"`
FinishedScanTask int `json:"finished_scan_task"`

ScanTaskErr string `json:"scan_task_err,omitempty"`
}

func (job *ttlJob) summary() (string, error) {
summary := &ttlSummary{
TotalRows: job.statistics.TotalRows.Load(),
SuccessRows: job.statistics.SuccessRows.Load(),
ErrorRows: job.statistics.ErrorRows.Load(),

TotalScanTask: len(job.tasks),
ScheduledScanTask: job.taskIter,
FinishedScanTask: job.finishedScanTaskCounter,
}

if job.scanTaskErr != nil {
summary.ScanTaskErr = job.scanTaskErr.Error()
}

summaryJSON, err := json.Marshal(summary)
if err != nil {
return "", err
}

return string(hack.String(summaryJSON)), nil
}
5 changes: 5 additions & 0 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session) er
if err != nil {
return errors.Trace(err)
}
// also updates some internal state for this job
err = job.updateState(ctx, se)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update state of the job", zap.String("jobID", job.id))
}
}
return nil
}
Expand Down
31 changes: 31 additions & 0 deletions ttl/ttlworker/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ttlworker
import (
"testing"

"github.com/pingcap/errors"
"github.com/stretchr/testify/assert"
)

Expand All @@ -35,3 +36,33 @@ func TestIterScanTask(t *testing.T) {
job.nextScanTask()
assert.True(t, job.AllSpawned())
}

func TestJobSummary(t *testing.T) {
statistics := &ttlStatistics{}
statistics.TotalRows.Store(1)
statistics.ErrorRows.Store(255)
statistics.SuccessRows.Store(128)

job := &ttlJob{
statistics: statistics,
tasks: []*ttlScanTask{{}},
}
summary, err := job.summary()
assert.NoError(t, err)
assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":0,"finished_scan_task":0}`, summary)

job.taskIter += 1
summary, err = job.summary()
assert.NoError(t, err)
assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":0}`, summary)

job.finishedScanTaskCounter += 1
summary, err = job.summary()
assert.NoError(t, err)
assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":1}`, summary)

job.scanTaskErr = errors.New("test error")
summary, err = job.summary()
assert.NoError(t, err)
assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":1,"scan_task_err":"test error"}`, summary)
}
15 changes: 0 additions & 15 deletions ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ttlworker

import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -72,20 +71,6 @@ func (s *ttlStatistics) String() string {
return fmt.Sprintf("Total Rows: %d, Success Rows: %d, Error Rows: %d", s.TotalRows.Load(), s.SuccessRows.Load(), s.ErrorRows.Load())
}

func (s *ttlStatistics) MarshalJSON() ([]byte, error) {
type jsonStatistics struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`
}

return json.Marshal(jsonStatistics{
TotalRows: s.TotalRows.Load(),
SuccessRows: s.SuccessRows.Load(),
ErrorRows: s.ErrorRows.Load(),
})
}

type ttlScanTask struct {
ctx context.Context

Expand Down
10 changes: 0 additions & 10 deletions ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,3 @@ func TestScanTaskDoScan(t *testing.T) {
task.schemaChangeInRetry = 2
task.runDoScanForTest(1, "table 'test.t1' meta changed, should abort current job: [schema:1146]Table 'test.t1' doesn't exist")
}

func TestTTLStatisticsMarshalJSON(t *testing.T) {
statistics := &ttlStatistics{}
statistics.TotalRows.Store(1)
statistics.ErrorRows.Store(255)
statistics.SuccessRows.Store(128)
j, err := statistics.MarshalJSON()
require.NoError(t, err)
require.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255}`, string(j))
}

0 comments on commit 11ceffd

Please sign in to comment.