From 7236ace0d20f006b42e5b2791fddb0dae2cfe259 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 15 Dec 2022 14:08:24 +0800 Subject: [PATCH 1/8] ddl, parser: split backfill jobs and check the jobs --- ddl/backfilling.go | 554 ++++++++++++++++++++++++++++++++++----- ddl/column.go | 56 ++-- ddl/ddl.go | 22 +- ddl/ddl_worker.go | 22 +- ddl/index.go | 138 +++++++--- ddl/index_merge_tmp.go | 39 ++- ddl/ingest/engine_mgr.go | 19 +- ddl/job_table.go | 19 +- ddl/job_table_test.go | 8 +- ddl/partition.go | 2 +- ddl/reorg.go | 8 +- parser/model/ddl.go | 10 +- 12 files changed, 722 insertions(+), 175 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 7c966591016d9..e3afbb64cc46f 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -61,7 +61,13 @@ const ( typeAddIndexMergeTmpWorker backfillerType = 3 // InstanceLease is the instance lease. - InstanceLease = 1 * time.Minute + InstanceLease = 1 * time.Minute + updateInstanceLease = 40 * time.Second + genTaskBatch = 8192 + minGenTaskBatch = 1024 + minDistTaskCnt = 16 + retrySQLTimes = 3 + retrySQLInterval = 500 * time.Millisecond ) func (bT backfillerType) String() string { @@ -107,8 +113,8 @@ func (bj *BackfillJob) AbbrStr() string { bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease) } -// GetOracleTime returns the current time from TS. -func GetOracleTime(se *session) (time.Time, error) { +// GetOracleTimeWithTxn returns the current time from TS with txn. +func GetOracleTimeWithTxn(se *session) (time.Time, error) { txn, err := se.Txn(true) if err != nil { return time.Time{}, err @@ -116,6 +122,15 @@ func GetOracleTime(se *session) (time.Time, error) { return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil } +// GetOracleTime returns the current time from TS. +func GetOracleTime(store kv.Storage) (time.Time, error) { + currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) + if err != nil { + return time.Time{}, errors.Trace(err) + } + return oracle.GetTimeFromTS(currentVer.Ver).UTC(), nil +} + // GetLeaseGoTime returns a types.Time by adding a lease. func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time { leaseTime := currTime.Add(lease) @@ -174,9 +189,35 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time { // Instead, it is divided into batches, each time a kv transaction completes the backfilling // of a partial batch. +type backfillCtx struct { + *ddlCtx + reorgTp model.ReorgType + sessCtx sessionctx.Context + schemaName string + table table.Table + batchCnt int +} + +func newBackfillCtx(ctx *ddlCtx, sessCtx sessionctx.Context, reorgTp model.ReorgType, + schemaName string, tbl table.Table) *backfillCtx { + return &backfillCtx{ + ddlCtx: ctx, + sessCtx: sessCtx, + reorgTp: reorgTp, + schemaName: schemaName, + table: tbl, + batchCnt: int(variable.GetDDLReorgBatchSize()), + } +} + type backfiller interface { BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) AddMetricInfo(float64) + GetTask() (*BackfillJob, error) + UpdateTask(bJob *BackfillJob) error + FinishTask(bJob *BackfillJob) error + GetCtx() *backfillCtx + String() string } type backfillResult struct { @@ -199,11 +240,26 @@ type backfillTaskContext struct { } type reorgBackfillTask struct { + bfJob *BackfillJob + physicalTable table.PhysicalTable + + // TODO: Remove the following fields after remove the function of run. id int physicalTableID int64 startKey kv.Key endKey kv.Key endInclude bool + jobID int64 + sqlQuery string + priority int +} + +func (r *reorgBackfillTask) getJobID() int64 { + jobID := r.jobID + if r.bfJob != nil { + jobID = r.bfJob.JobID + } + return jobID } func (r *reorgBackfillTask) excludedEndKey() kv.Key { @@ -232,33 +288,62 @@ func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResu } type backfillWorker struct { - id int - reorgInfo *reorgInfo - batchCnt int - sessCtx sessionctx.Context - taskCh chan *reorgBackfillTask - resultCh chan *backfillResult - table table.Table - priority int - tp backfillerType - ctx context.Context - cancel func() + id int + backfiller + taskCh chan *reorgBackfillTask + resultCh chan *backfillResult + ctx context.Context + cancel func() } -func newBackfillWorker(ctx context.Context, sessCtx sessionctx.Context, id int, t table.PhysicalTable, - reorgInfo *reorgInfo, tp backfillerType) *backfillWorker { +func newBackfillWorker(ctx context.Context, id int, bf backfiller) *backfillWorker { bfCtx, cancel := context.WithCancel(ctx) return &backfillWorker{ - id: id, - table: t, - reorgInfo: reorgInfo, - batchCnt: int(variable.GetDDLReorgBatchSize()), - sessCtx: sessCtx, - priority: reorgInfo.Job.Priority, - tp: tp, - ctx: bfCtx, - cancel: cancel, + backfiller: bf, + id: id, + taskCh: make(chan *reorgBackfillTask, 1), + resultCh: make(chan *backfillResult, 1), + ctx: bfCtx, + cancel: cancel, + } +} + +func (w *backfillWorker) updateLease(exec_id string, bJob *BackfillJob) error { + isDistReorg, err := w.GetCtx().sessCtx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBDDLEnableDistributeReorg) + if err != nil { + return err + } + if isDistReorg != variable.On { + return nil + } + + leaseTime, err := GetOracleTime(w.GetCtx().store) + if err != nil { + return err + } + bJob.InstanceID = exec_id + bJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease) + return w.backfiller.UpdateTask(bJob) +} + +func (w *backfillWorker) finishJob(bJob *BackfillJob) error { + isDistReorg, err := w.GetCtx().sessCtx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBDDLEnableDistributeReorg) + if err != nil { + return err + } + if isDistReorg != variable.On { + return nil + } + + bJob.State = model.JobStateDone + return w.backfiller.FinishTask(bJob) +} + +func (w *backfillWorker) String() string { + if w.backfiller == nil { + return fmt.Sprintf("worker %d", w.id) } + return fmt.Sprintf("worker %d, tp %s", w.id, w.backfiller.String()) } func (w *backfillWorker) Close() { @@ -286,17 +371,19 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, addedCount: 0, nextKey: handleRange.startKey, } + batchStartTime := time.Now() lastLogCount := 0 lastLogTime := time.Now() startTime := lastLogTime - rc := d.getReorgCtx(w.reorgInfo.Job) + jobID := task.getJobID() + rc := d.getReorgCtx(jobID) for { // Give job chance to be canceled, if we not check it here, // if there is panic in bf.BackfillDataInTxn we will never cancel the job. // Because reorgRecordTask may run a long time, // we should check whether this ddl job is still runnable. - err := d.isReorgRunnable(w.reorgInfo.Job) + err := d.isReorgRunnable(jobID) if err != nil { result.err = err return result @@ -325,9 +412,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, if num := result.scanCount - lastLogCount; num >= 90000 { lastLogCount = result.scanCount logutil.BgLogger().Info("[ddl] backfill worker back fill index", - zap.Int("worker ID", w.id), - zap.Int("added count", result.addedCount), - zap.Int("scan count", result.scanCount), + zap.Int("addedCount", result.addedCount), zap.Int("scanCount", result.scanCount), zap.String("next key", hex.EncodeToString(taskCtx.nextKey)), zap.Float64("speed(rows/s)", float64(num)/time.Since(lastLogTime).Seconds())) lastLogTime = time.Now() @@ -337,11 +422,25 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, if taskCtx.done { break } + + if task.bfJob != nil { + // TODO: Adjust the updating lease frequency by batch processing time carefully. + if time.Since(batchStartTime) < updateInstanceLease { + continue + } + batchStartTime = time.Now() + task.bfJob.CurrKey = result.nextKey + if err := w.updateLease(w.GetCtx().uuid, task.bfJob); err != nil { + logutil.BgLogger().Info("[ddl] backfill worker handle task, update lease failed", zap.Stringer("worker", w), + zap.Stringer("task", task), zap.String("bj", task.bfJob.AbbrStr()), zap.Error(err)) + result.err = err + return result + } + } } logutil.BgLogger().Info("[ddl] backfill worker finish task", - zap.Stringer("type", w.tp), - zap.Int("worker ID", w.id), - zap.String("task", task.String()), + zap.Stringer("worker", w), + zap.Stringer("task", task), zap.Int("added count", result.addedCount), zap.Int("scan count", result.scanCount), zap.String("next key", hex.EncodeToString(result.nextKey)), @@ -353,9 +452,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, } func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { - logutil.BgLogger().Info("[ddl] backfill worker start", - zap.Stringer("type", w.tp), - zap.Int("workerID", w.id)) + logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w)) var curTaskID int defer util.Recover(metrics.LabelDDL, "backfillWorker.run", func() { w.resultCh <- &backfillResult{taskID: curTaskID, err: dbterror.ErrReorgPanic} @@ -363,17 +460,17 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { for { if util.HasCancelled(w.ctx) { logutil.BgLogger().Info("[ddl] backfill worker exit on context done", - zap.Stringer("type", w.tp), zap.Int("workerID", w.id)) + zap.Stringer("worker", w), zap.Int("workerID", w.id)) return } task, more := <-w.taskCh if !more { logutil.BgLogger().Info("[ddl] backfill worker exit", - zap.Stringer("type", w.tp), zap.Int("workerID", w.id)) + zap.Stringer("worker", w), zap.Int("workerID", w.id)) return } curTaskID = task.id - d.setDDLLabelForTopSQL(job) + d.setDDLLabelForTopSQL(job.ID, job.Query) logutil.BgLogger().Debug("[ddl] backfill worker got task", zap.Int("workerID", w.id), zap.String("task", task.String())) failpoint.Inject("mockBackfillRunErr", func() { @@ -394,12 +491,12 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { }) // Change the batch size dynamically. - w.batchCnt = int(variable.GetDDLReorgBatchSize()) + w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize()) result := w.handleBackfillTask(d, task, bf) w.resultCh <- result if result.err != nil { logutil.BgLogger().Info("[ddl] backfill worker exit on error", - zap.Stringer("type", w.tp), zap.Int("workerID", w.id), zap.Error(result.err)) + zap.Stringer("workerr", w), zap.Int("workerID", w.id), zap.Error(result.err)) return } } @@ -499,7 +596,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount nextKey, taskAddedCount, err := waitTaskResults(scheduler, batchTasks, totalAddedCount) elapsedTime := time.Since(startTime) if err == nil { - err = dc.isReorgRunnable(reorgInfo.Job) + err = dc.isReorgRunnable(reorgInfo.Job.ID) } if err != nil { @@ -507,8 +604,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds()) logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed", - zap.ByteString("element type", reorgInfo.currElement.TypeKey), - zap.Int64("element ID", reorgInfo.currElement.ID), + zap.Int64("total added count", *totalAddedCount), zap.String("start key", hex.EncodeToString(startKey)), zap.String("next key", hex.EncodeToString(nextKey)), @@ -526,11 +622,10 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount } // nextHandle will be updated periodically in runReorgJob, so no need to update it here. - dc.getReorgCtx(reorgInfo.Job).setNextKey(nextKey) + dc.getReorgCtx(reorgInfo.Job.ID).setNextKey(nextKey) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds()) logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch", - zap.ByteString("element type", reorgInfo.currElement.TypeKey), - zap.Int64("element ID", reorgInfo.currElement.ID), + zap.Stringer("element", reorgInfo.currElement), zap.Int64("total added count", *totalAddedCount), zap.String("start key", hex.EncodeToString(startKey)), zap.String("next key", hex.EncodeToString(nextKey)), @@ -539,11 +634,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount return nil } -// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled. -func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, - totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { +func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange) []*reorgBackfillTask { batchTasks := make([]*reorgBackfillTask, 0, backfillTaskChanSize) - reorgInfo := scheduler.reorgInfo physicalTableID := reorgInfo.PhysicalTableID var prefix kv.Key if tbl, ok := t.(table.PartitionedTable); ok { @@ -556,14 +648,16 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, } // Build reorg tasks. job := reorgInfo.Job + pt := t.(table.PhysicalTable) + jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID) for i, keyRange := range kvRanges { startKey := keyRange.StartKey endKey := keyRange.EndKey - endK, err := getRangeEndKey(scheduler.jobCtx, dc.store, job.Priority, prefix, keyRange.StartKey, endKey) + endK, err := getRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, prefix, keyRange.StartKey, endKey) if err != nil { - logutil.BgLogger().Info("[ddl] send range task to workers, get reverse key failed", zap.Error(err)) + logutil.BgLogger().Info("[ddl] get backfill range task, get reverse key failed", zap.Error(err)) } else { - logutil.BgLogger().Info("[ddl] send range task to workers, change end key", + logutil.BgLogger().Info("[ddl] get backfill range task, change end key", zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK))) endKey = endK } @@ -576,7 +670,10 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, task := &reorgBackfillTask{ id: i, + jobID: reorgInfo.Job.ID, physicalTableID: physicalTableID, + physicalTable: pt, + priority: reorgInfo.Priority, startKey: startKey, endKey: endKey, // If the boundaries overlap, we should ignore the preceding endKey. @@ -587,7 +684,13 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, break } } + return batchTasks +} +// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled. +func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, + totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { + batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges) if len(batchTasks) == 0 { return nil, nil } @@ -749,7 +852,9 @@ func (b *backfillScheduler) adjustWorkerSize() error { ) switch b.tp { case typeAddIndexWorker: - idxWorker, err := newAddIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc, job) + backfillCtx := newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl) + idxWorker, err := newAddIndexWorker(b.decodeColMap, i, b.tbl, backfillCtx, + jc, job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) if err != nil { if b.canSkipError(err) { continue @@ -757,18 +862,23 @@ func (b *backfillScheduler) adjustWorkerSize() error { return err } idxWorker.copReqSenderPool = b.copReqSenderPool - worker, runner = idxWorker, idxWorker.backfillWorker + runner = newBackfillWorker(jc.ddlJobCtx, i, idxWorker) + worker = idxWorker case typeAddIndexMergeTmpWorker: - tmpIdxWorker := newMergeTempIndexWorker(sessCtx, i, b.tbl, reorgInfo, jc) - worker, runner = tmpIdxWorker, tmpIdxWorker.backfillWorker + backfillCtx := newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl) + tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, i, b.tbl, reorgInfo.currElement.ID, jc) + runner = newBackfillWorker(jc.ddlJobCtx, i, tmpIdxWorker) + worker = tmpIdxWorker case typeUpdateColumnWorker: // Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting. sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true updateWorker := newUpdateColumnWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) - worker, runner = updateWorker, updateWorker.backfillWorker + runner = newBackfillWorker(jc.ddlJobCtx, i, updateWorker) + worker = updateWorker case typeCleanUpIndexWorker: idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) - worker, runner = idxWorker, idxWorker.backfillWorker + runner = newBackfillWorker(jc.ddlJobCtx, i, idxWorker) + worker = idxWorker default: return errors.New("unknown backfill type") } @@ -864,7 +974,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic return errors.Trace(err) } - if err := dc.isReorgRunnable(reorgInfo.Job); err != nil { + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { return errors.Trace(err) } if startKey == nil && endKey == nil { @@ -878,7 +988,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic } }) - jc := dc.jobContext(job) + jc := dc.jobContext(job.ID) scheduler := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, decodeColMap, jc) defer scheduler.Close() @@ -954,6 +1064,326 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { return nil } +func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool, + batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error { + bJobs = bJobs[:0] + instanceID := "" + if notDistTask { + instanceID = reorgInfo.d.uuid + } + // TODO: Adjust the number of ranges(region) for each task. + for _, task := range batchTasks { + bm := &model.BackfillMeta{ + PhysicalTableID: reorgInfo.PhysicalTableID, + IsUnique: isUnique, + EndInclude: task.endInclude, + ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, + SQLMode: reorgInfo.ReorgMeta.SQLMode, + Location: reorgInfo.ReorgMeta.Location, + JobMeta: &model.JobMeta{ + SchemaID: reorgInfo.Job.SchemaID, + TableID: reorgInfo.Job.TableID, + Query: reorgInfo.Job.Query, + }, + } + bj := &BackfillJob{ + ID: *id, + JobID: reorgInfo.Job.ID, + EleID: reorgInfo.currElement.ID, + EleKey: reorgInfo.currElement.TypeKey, + Tp: bfWorkerType, + State: model.JobStateNone, + InstanceID: instanceID, + CurrKey: task.startKey, + StartKey: task.startKey, + EndKey: task.endKey, + Meta: bm, + } + *id++ + bJobs = append(bJobs, bj) + } + if err := AddBackfillJobs(sess, bJobs); err != nil { + return errors.Trace(err) + } + return nil +} + +func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbls []table.PhysicalTable, isUnique bool, + bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error { + endKey := reorgInfo.EndKey + isFirstOps := true + bJobs := make([]*BackfillJob, 0, genTaskBatch) + batch := genTaskBatch + t := pTbls[0] + for { + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + if batch > len(kvRanges) { + batch = len(kvRanges) + } + batchTasks := getBatchTasks(t, reorgInfo, kvRanges) + notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) + if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs[:0], isUnique, &currBackfillJobID); err != nil { + return errors.Trace(err) + } + isFirstOps = false + + remains := kvRanges[len(batchTasks):] + // TODO: After adding backfillCh do asyncNotify(dc.backfillJobCh). + logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", + zap.Int("batchTasksCnt", len(batchTasks)), + zap.Int("totalRegionCnt", len(kvRanges)), + zap.Int("remainRegionCnt", len(remains)), + zap.String("startHandle", hex.EncodeToString(startKey)), + zap.String("endHandle", hex.EncodeToString(endKey))) + + if len(remains) == 0 { + break + } + + for { + isFinished, bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if isFinished { + return nil + } + if bJobCnt < minGenTaskBatch { + break + } + time.Sleep(time.Second) + } + startKey = remains[0].StartKey + } + return nil +} + +func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { + startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey + if startKey == nil && endKey == nil { + return nil + } + + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + return errors.Trace(err) + } + + currBackfillJobID := int64(1) + isFinished, maxBfJob, err := getMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if isFinished { + return nil + } + if maxBfJob != nil { + startKey = maxBfJob.EndKey + currBackfillJobID = maxBfJob.ID + 1 + } + + var isUnique bool + if bfWorkerType == typeAddIndexWorker { + idxInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) + isUnique = idxInfo.Unique + } + err = dc.splitTableToBackfillJobs(sess, reorgInfo, []table.PhysicalTable{t}, isUnique, bfWorkerType, startKey, currBackfillJobID) + if err != nil { + return errors.Trace(err) + } + + var backfillJobFinished bool + ticker := time.NewTicker(300 * time.Millisecond) + defer ticker.Stop() + for { + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + return errors.Trace(err) + } + + select { + case <-ticker.C: + isFinished, err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + logutil.BgLogger().Warn("[ddl] checkAndHandleInterruptedBackfillJobs failed", zap.Error(err)) + return errors.Trace(err) + } + if isFinished { + logutil.BgLogger().Info("[ddl] finish interrupted backfill jobs") + return nil + } + + if !backfillJobFinished { + bJob, err := getBackfillJobWithRetry(sess, BackfillTable, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) + if err != nil { + logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Error(err)) + return errors.Trace(err) + } + if bJob == nil { + backfillJobFinished = true + logutil.BgLogger().Info("[ddl] finish backfill jobs") + } + } else { + isSynced, err := checkJobIsSynced(sess, reorgInfo.Job.ID) + if err != nil { + logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Error(err)) + return errors.Trace(err) + } + if isSynced { + logutil.BgLogger().Info("[ddl] sync backfill jobs") + return nil + } + } + case <-dc.ctx.Done(): + return dbterror.ErrInvalidWorker.GenWithStack("worker is closed") + } + } +} + +func checkJobIsSynced(sess *session, jobID int64) (bool, error) { + var err error + var unsyncedInstanceIDs []string + for i := 0; i < retrySQLTimes; i++ { + unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, jobID, "check_backfill_history_job_sync") + if err != nil { + return false, errors.Trace(err) + } + if len(unsyncedInstanceIDs) == 0 { + return true, nil + } + + logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", zap.Int("tryTimes", i), zap.Error(err)) + time.Sleep(retrySQLInterval) + } + + return false, errors.Trace(err) +} + +func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (isFinished bool, err error) { + var bJobs []*BackfillJob + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetInterruptedBackfillJobsForOneEle(sess, jobID, currEleID, currEleKey) + if err == nil { + break + } + logutil.BgLogger().Info("[ddl] getInterruptedBackfillJobsForOneEle failed", zap.Error(err)) + time.Sleep(retrySQLInterval) + } + if err != nil { + return false, errors.Trace(err) + } + if len(bJobs) == 0 { + return false, nil + } + + for i := 0; i < retrySQLTimes; i++ { + err = finishFailedBackfillJobs(sess, bJobs[0]) + if err == nil { + return true, errors.Errorf(bJobs[0].Meta.ErrMsg) + } + logutil.BgLogger().Info("[ddl] finishFailedBackfillJobs failed", zap.Error(err)) + time.Sleep(retrySQLInterval) + } + return false, errors.Trace(err) +} + +func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (isFinished bool, backfillJobCnt int, err error) { + isFinished, err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) + if err != nil { + return false, 0, errors.Trace(err) + } + if isFinished { + return true, 0, nil + } + + backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + jobID, currEleID, currEleKey), "check_backfill_job_count") + if err != nil { + return false, 0, errors.Trace(err) + } + + return false, backfillJobCnt, nil +} + +func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isAscend bool) (bJob *BackfillJob, err error) { + var bJobs []*BackfillJob + descStr := "" + if !isAscend { + descStr = "desc" + } + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' order by ddl_job_id, ele_id, ele_key, id %s limit 1", + jobID, currEleID, currEleKey, descStr), "check_backfill_job_state") + if err != nil { + logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) + continue + } + + if len(bJobs) != 0 { + return bJobs[0], nil + } else { + return nil, nil + } + } + return nil, errors.Trace(err) +} + +func getMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (bool, *BackfillJob, error) { + isFinished, err := checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) + if err != nil { + return false, nil, errors.Trace(err) + } + if isFinished { + return true, nil, nil + } + + bJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, false) + if err != nil { + return false, nil, errors.Trace(err) + } + hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, false) + if err != nil { + return false, nil, errors.Trace(err) + } + + if bJob == nil { + return false, hJob, nil + } + if hJob == nil { + return false, bJob, nil + } + if bJob.ID > hJob.ID { + return false, bJob, nil + } + return false, hJob, nil +} + +func finishFailedBackfillJobs(sessCtx sessionctx.Context, bJob *BackfillJob) error { + sess, ok := sessCtx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", sessCtx) + } + + return runInTxn(sess, func(se *session) error { + // TODO: Batch by batch update backfill jobs and insert backfill history jobs. + bJobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + bJob.JobID, bJob.EleID, bJob.EleKey), "update_backfill_job") + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + err = RemoveBackfillJob(sess, true, nil) + if err == nil { + err = AddBackfillHistoryJob(sess, bJobs) + } + return errors.Trace(err) + }) +} + // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) diff --git a/ddl/column.go b/ddl/column.go index d9425ceabac2c..7dcc32c00755f 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -811,7 +811,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J if err != nil { return false, ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false) + reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1080,7 +1080,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error TestReorgGoroutineRunning <- a for { time.Sleep(30 * time.Millisecond) - if w.getReorgCtx(reorgInfo.Job).isReorgCanceled() { + if w.getReorgCtx(reorgInfo.Job.ID).isReorgCanceled() { // Job is cancelled. So it can't be done. failpoint.Return(dbterror.ErrCancelledDDLJob) } @@ -1105,7 +1105,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error if err != nil { return errors.Trace(err) } - originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority) + originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job.ID), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) } @@ -1128,11 +1128,11 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error // Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle]. if i == startElementOffsetToResetHandle+1 { reorgInfo.StartKey, reorgInfo.EndKey = originalStartHandle, originalEndHandle - w.getReorgCtx(reorgInfo.Job).setNextKey(reorgInfo.StartKey) + w.getReorgCtx(reorgInfo.Job.ID).setNextKey(reorgInfo.StartKey) } // Update the element in the reorgCtx to keep the atomic access for daemon-worker. - w.getReorgCtx(reorgInfo.Job).setCurrentElement(reorgInfo.elements[i+1]) + w.getReorgCtx(reorgInfo.Job.ID).setCurrentElement(reorgInfo.elements[i+1]) // Update the element in the reorgInfo for updating the reorg meta below. reorgInfo.currElement = reorgInfo.elements[i+1] @@ -1156,7 +1156,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error } type updateColumnWorker struct { - *backfillWorker + *backfillCtx oldColInfo *model.ColumnInfo newColInfo *model.ColumnInfo metricCounter prometheus.Counter @@ -1168,7 +1168,6 @@ type updateColumnWorker struct { rowMap map[int64]types.Datum // For SQL Mode and warnings. - sqlMode mysql.SQLMode jobContext *JobContext } @@ -1188,14 +1187,13 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT } rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) return &updateColumnWorker{ - backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeUpdateColumnWorker), - oldColInfo: oldCol, - newColInfo: newCol, - metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())), - rowDecoder: rowDecoder, - rowMap: make(map[int64]types.Datum, len(decodeColMap)), - sqlMode: reorgInfo.ReorgMeta.SQLMode, - jobContext: jc, + backfillCtx: newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t), + oldColInfo: oldCol, + newColInfo: newCol, + metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())), + rowDecoder: rowDecoder, + rowMap: make(map[int64]types.Datum, len(decodeColMap)), + jobContext: jc, } } @@ -1203,6 +1201,26 @@ func (w *updateColumnWorker) AddMetricInfo(cnt float64) { w.metricCounter.Add(cnt) } +func (w *updateColumnWorker) String() string { + return typeUpdateColumnWorker.String() +} + +func (w *updateColumnWorker) GetTask() (*BackfillJob, error) { + panic("[ddl] update column worker GetTask function doesn't implement") +} + +func (w *updateColumnWorker) UpdateTask(job *BackfillJob) error { + panic("[ddl] update column worker UpdateTask function doesn't implement") +} + +func (w *updateColumnWorker) FinishTask(job *BackfillJob) error { + panic("[ddl] update column worker FinishTask function doesn't implement") +} + +func (w *updateColumnWorker) GetCtx() *backfillCtx { + return w.backfillCtx +} + type rowRecord struct { key []byte // It's used to lock a record. Record it to reduce the encoding time. vals []byte // It's the record. @@ -1228,8 +1246,8 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg taskDone := false var lastAccessedHandle kv.Key oprStartTime := startTime - err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, w.table.RecordPrefix(), txn.StartTS(), taskRange.startKey, taskRange.endKey, - func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { + err := iterateSnapshotKeys(w.GetCtx().jobContext(taskRange.getJobID()), w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(), + txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in updateColumnWorker fetchRowColVals", 0) oprStartTime = oprEndTime @@ -1370,8 +1388,8 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil { + txn.SetOption(kv.Priority, handleRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } diff --git a/ddl/ddl.go b/ddl/ddl.go index 8c4d5235ea7ad..ba12a620c91a4 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -422,15 +422,15 @@ func (dc *ddlCtx) isOwner() bool { return isOwner } -func (dc *ddlCtx) setDDLLabelForTopSQL(job *model.Job) { +func (dc *ddlCtx) setDDLLabelForTopSQL(jobID int64, jobQuery string) { dc.jobCtx.Lock() defer dc.jobCtx.Unlock() - ctx, exists := dc.jobCtx.jobCtxMap[job.ID] + ctx, exists := dc.jobCtx.jobCtxMap[jobID] if !exists { ctx = NewJobContext() - dc.jobCtx.jobCtxMap[job.ID] = ctx + dc.jobCtx.jobCtxMap[jobID] = ctx } - ctx.setDDLLabelForTopSQL(job) + ctx.setDDLLabelForTopSQL(jobQuery) } func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) { @@ -444,10 +444,10 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) { } } -func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger { +func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) tikvrpc.ResourceGroupTagger { dc.jobCtx.Lock() defer dc.jobCtx.Unlock() - ctx, exists := dc.jobCtx.jobCtxMap[job.ID] + ctx, exists := dc.jobCtx.jobCtxMap[jobID] if !exists { return nil } @@ -460,19 +460,19 @@ func (dc *ddlCtx) removeJobCtx(job *model.Job) { delete(dc.jobCtx.jobCtxMap, job.ID) } -func (dc *ddlCtx) jobContext(job *model.Job) *JobContext { +func (dc *ddlCtx) jobContext(jobID int64) *JobContext { dc.jobCtx.RLock() defer dc.jobCtx.RUnlock() - if jobContext, exists := dc.jobCtx.jobCtxMap[job.ID]; exists { + if jobContext, exists := dc.jobCtx.jobCtxMap[jobID]; exists { return jobContext } return NewJobContext() } -func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx { +func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx { dc.reorgCtx.RLock() defer dc.reorgCtx.RUnlock() - return dc.reorgCtx.reorgCtxMap[job.ID] + return dc.reorgCtx.reorgCtxMap[jobID] } func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx { @@ -497,7 +497,7 @@ func (dc *ddlCtx) removeReorgCtx(job *model.Job) { } func (dc *ddlCtx) notifyReorgCancel(job *model.Job) { - rc := dc.getReorgCtx(job) + rc := dc.getReorgCtx(job.ID) if rc == nil { return } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 8621dcb08361c..e87a7823b2771 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -717,14 +717,14 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta { return meta.NewMeta(txn) } -func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) { - if !topsqlstate.TopSQLEnabled() || job == nil { +func (w *JobContext) setDDLLabelForTopSQL(jobQuery string) { + if !topsqlstate.TopSQLEnabled() || jobQuery == "" { return } - if job.Query != w.cacheSQL || w.cacheDigest == nil { - w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query) - w.cacheSQL = job.Query + if jobQuery != w.cacheSQL || w.cacheDigest == nil { + w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(jobQuery) + w.cacheSQL = jobQuery w.ddlJobCtx = topsql.AttachAndRegisterSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, false) } else { topsql.AttachAndRegisterSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, false) @@ -808,10 +808,10 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { if w.tp == addIdxWorker && job.IsRunning() { txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull) } - w.setDDLLabelForTopSQL(job) + w.setDDLLabelForTopSQL(job.ID, job.Query) w.setDDLSourceForDiagnosis(job) - jobContext := w.jobContext(job) - if tagger := w.getResourceGroupTaggerForTopSQL(job); tagger != nil { + jobContext := w.jobContext(job.ID) + if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } t := meta.NewMeta(txn) @@ -947,10 +947,10 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull) } - w.setDDLLabelForTopSQL(job) + w.setDDLLabelForTopSQL(job.ID, job.Query) w.setDDLSourceForDiagnosis(job) - jobContext := w.jobContext(job) - if tagger := w.getResourceGroupTaggerForTopSQL(job); tagger != nil { + jobContext := w.jobContext(job.ID) + if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } if isDone, err1 := isDependencyJobDone(t, job); err1 != nil || !isDone { diff --git a/ddl/index.go b/ddl/index.go index 0f70b73b61046..158c949451abb 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -874,7 +875,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, if err != nil { return false, ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) + reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1179,9 +1180,10 @@ type indexRecord struct { } type baseIndexWorker struct { - *backfillWorker + *backfillCtx indexes []table.Index + tp backfillerType metricCounter prometheus.Counter // The following attributes are used to reduce memory allocation. @@ -1190,7 +1192,6 @@ type baseIndexWorker struct { rowMap map[int64]types.Datum rowDecoder *decoder.RowDecoder - sqlMode mysql.SQLMode jobContext *JobContext } @@ -1206,24 +1207,23 @@ type addIndexWorker struct { distinctCheckFlags []bool } -func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, - reorgInfo *reorgInfo, jc *JobContext, job *model.Job) (*addIndexWorker, error) { - if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) { - logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", reorgInfo.Query), - zap.String("reorgInfo", reorgInfo.String())) - return nil, errors.Errorf("element type is not index, typeKey: %v", reorgInfo.currElement.TypeKey) +func newAddIndexWorker(decodeColMap map[int64]decoder.Column, id int, t table.PhysicalTable, bfCtx *backfillCtx, jc *JobContext, jobID, eleID int64, eleTypeKey []byte) (*addIndexWorker, error) { + if !bytes.Equal(eleTypeKey, meta.IndexElementKey) { + logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", jc.cacheSQL), + zap.Int64("job ID", jobID), zap.ByteString("element type", eleTypeKey), zap.Int64("element ID", eleID)) + return nil, errors.Errorf("element type is not index, typeKey: %v", eleTypeKey) } - indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) + indexInfo := model.FindIndexInfoByID(t.Meta().Indices, eleID) index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) var lwCtx *ingest.WriterContext - if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - bc, ok := ingest.LitBackCtxMgr.Load(job.ID) + if bfCtx.reorgTp == model.ReorgTypeLitMerge { + bc, ok := ingest.LitBackCtxMgr.Load(jobID) if !ok { return nil, errors.Trace(errors.New(ingest.LitErrGetBackendFail)) } - ei, err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID) + ei, err := bc.EngMgr.Register(bc, jobID, eleID, bfCtx.schemaName, t.Meta().Name.O) if err != nil { return nil, errors.Trace(err) } @@ -1235,14 +1235,13 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable return &addIndexWorker{ baseIndexWorker: baseIndexWorker{ - backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeAddIndexWorker), - indexes: []table.Index{index}, - rowDecoder: rowDecoder, - defaultVals: make([]types.Datum, len(t.WritableCols())), - rowMap: make(map[int64]types.Datum, len(decodeColMap)), - metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("add_idx_rate", reorgInfo.SchemaName, t.Meta().Name.String())), - sqlMode: reorgInfo.ReorgMeta.SQLMode, - jobContext: jc, + backfillCtx: bfCtx, + indexes: []table.Index{index}, + rowDecoder: rowDecoder, + defaultVals: make([]types.Datum, len(t.WritableCols())), + rowMap: make(map[int64]types.Datum, len(decodeColMap)), + metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("add_idx_rate", bfCtx.schemaName, t.Meta().Name.String())), + jobContext: jc, }, index: index, writerCtx: lwCtx, @@ -1253,6 +1252,55 @@ func (w *baseIndexWorker) AddMetricInfo(cnt float64) { w.metricCounter.Add(cnt) } +func (w *baseIndexWorker) GetTask() (*BackfillJob, error) { + return nil, nil +} + +func (w *baseIndexWorker) String() string { + return w.tp.String() +} + +func (w *baseIndexWorker) UpdateTask(bJob *BackfillJob) error { + sess, ok := w.backfillCtx.sessCtx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx) + } + + return runInTxn(sess, func(se *session) error { + jobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and id = %d and ele_key = '%s'", + bJob.JobID, bJob.EleID, bJob.ID, bJob.EleKey), "update_backfill_task") + if err != nil { + return err + } + + if len(jobs) == 0 { + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill bJob, lease is timeout") + } + if jobs[0].InstanceID != bJob.InstanceID { + return dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get a backfill bJob %v, want instance ID %s", jobs[0], bJob.InstanceID)) + } + return updateBackfillJob(sess, BackfillTable, bJob, "update_backfill_task") + }) +} + +func (w *baseIndexWorker) FinishTask(bJob *BackfillJob) error { + sess, ok := w.backfillCtx.sessCtx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx) + } + return runInTxn(sess, func(se *session) error { + err := RemoveBackfillJob(sess, false, bJob) + if err != nil { + return err + } + return AddBackfillHistoryJob(sess, []*BackfillJob{bJob}) + }) +} + +func (w *baseIndexWorker) GetCtx() *backfillCtx { + return w.backfillCtx +} + // mockNotOwnerErrOnce uses to make sure `notOwnerErr` only mock error once. var mockNotOwnerErrOnce uint32 @@ -1343,8 +1391,9 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac // taskDone means that the reorged handle is out of taskRange.endHandle. taskDone := false oprStartTime := startTime - err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, w.table.RecordPrefix(), txn.StartTS(), taskRange.startKey, taskRange.endKey, - func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { + jobID := taskRange.getJobID() + err := iterateSnapshotKeys(w.GetCtx().jobContext(jobID), w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(), txn.StartTS(), + taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in baseIndexWorker fetchRowColVals", 0) oprStartTime = oprEndTime @@ -1504,12 +1553,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC needMergeTmpIdx := w.index.Meta().BackfillState != model.BackfillStateInapplicable oprStartTime := time.Now() + jobID := handleRange.getJobID() ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType()) errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil { + txn.SetOption(kv.Priority, handleRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(jobID); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } @@ -1622,6 +1672,19 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { } } } else { + // TODO: Support typeAddIndexMergeTmpWorker and partitionTable. + isDistReorg, err := w.sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBDDLEnableDistributeReorg) + if err != nil { + return err + } + if isDistReorg == variable.On && !reorgInfo.mergingTmpIdx { + sCtx, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(sCtx) + return w.controlWritePhysicalTableRecord(newSession(sCtx), t.(table.PhysicalTable), typeAddIndexWorker, reorgInfo) + } //nolint:forcetypeassert err = w.addPhysicalTableIndex(t.(table.PhysicalTable), reorgInfo) } @@ -1666,7 +1729,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(reorg.d.jobContext(reorg.Job), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(reorg.d.jobContext(reorg.Job.ID), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } @@ -1747,14 +1810,13 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT } return &cleanUpIndexWorker{ baseIndexWorker: baseIndexWorker{ - backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeCleanUpIndexWorker), - indexes: indexes, - rowDecoder: rowDecoder, - defaultVals: make([]types.Datum, len(t.WritableCols())), - rowMap: make(map[int64]types.Datum, len(decodeColMap)), - metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("cleanup_idx_rate", reorgInfo.SchemaName, t.Meta().Name.String())), - sqlMode: reorgInfo.ReorgMeta.SQLMode, - jobContext: jc, + backfillCtx: newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t), + indexes: indexes, + rowDecoder: rowDecoder, + defaultVals: make([]types.Datum, len(t.WritableCols())), + rowMap: make(map[int64]types.Datum, len(decodeColMap)), + metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("cleanup_idx_rate", reorgInfo.SchemaName, t.Meta().Name.String())), + jobContext: jc, }, } } @@ -1772,8 +1834,8 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil { + txn.SetOption(kv.Priority, handleRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } @@ -1856,7 +1918,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(reorg.d.jobContext(reorg.Job), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(reorg.d.jobContext(reorg.Job.ID), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } @@ -1865,7 +1927,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r // Write the reorg info to store so the whole reorganize process can recover from panic. err = reorg.UpdateReorgMeta(reorg.StartKey, w.sessPool) logutil.BgLogger().Info("[ddl] job update reorg info", zap.Int64("jobID", reorg.Job.ID), - zap.ByteString("element type", reorg.currElement.TypeKey), zap.Int64("element ID", reorg.currElement.ID), + zap.Stringer("element", reorg.currElement), zap.Int64("partition table ID", pid), zap.String("start key", hex.EncodeToString(start)), zap.String("end key", hex.EncodeToString(end)), zap.Error(err)) return false, errors.Trace(err) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 5f699b3507e6f..7d4bc8b5e23fa 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -80,7 +79,7 @@ type temporaryIndexRecord struct { } type mergeIndexWorker struct { - *backfillWorker + *backfillCtx index table.Index @@ -90,15 +89,15 @@ type mergeIndexWorker struct { jobContext *JobContext } -func newMergeTempIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, reorgInfo *reorgInfo, jc *JobContext) *mergeIndexWorker { - indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) +func newMergeTempIndexWorker(bfCtx *backfillCtx, id int, t table.PhysicalTable, eleID int64, jc *JobContext) *mergeIndexWorker { + indexInfo := model.FindIndexInfoByID(t.Meta().Indices, eleID) index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) return &mergeIndexWorker{ - backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeAddIndexMergeTmpWorker), - index: index, - jobContext: jc, + backfillCtx: bfCtx, + index: index, + jobContext: jc, } } @@ -109,8 +108,8 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil { + txn.SetOption(kv.Priority, taskRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } @@ -166,6 +165,26 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC func (w *mergeIndexWorker) AddMetricInfo(cnt float64) { } +func (w *mergeIndexWorker) String() string { + return typeAddIndexMergeTmpWorker.String() +} + +func (w *mergeIndexWorker) GetTask() (*BackfillJob, error) { + panic("[ddl] merge index worker GetTask function doesn't implement") +} + +func (w *mergeIndexWorker) UpdateTask(job *BackfillJob) error { + panic("[ddl] merge index worker UpdateTask function doesn't implement") +} + +func (w *mergeIndexWorker) FinishTask(job *BackfillJob) error { + panic("[ddl] merge index worker FinishTask function doesn't implement") +} + +func (w *mergeIndexWorker) GetCtx() *backfillCtx { + return w.backfillCtx +} + func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*temporaryIndexRecord, kv.Key, bool, error) { startTime := time.Now() w.tmpIdxRecords = w.tmpIdxRecords[:0] @@ -177,7 +196,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor idxPrefix := w.table.IndexPrefix() var lastKey kv.Key isCommonHandle := w.table.Meta().IsCommonHandle - err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, idxPrefix, txn.StartTS(), + err := iterateSnapshotKeys(w.GetCtx().jobContext(taskRange.getJobID()), w.sessCtx.GetStore(), taskRange.priority, idxPrefix, txn.StartTS(), taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterate temporary index in merge process", 0) diff --git a/ddl/ingest/engine_mgr.go b/ddl/ingest/engine_mgr.go index 565d0b30d1ab8..f9b006ec9e369 100644 --- a/ddl/ingest/engine_mgr.go +++ b/ddl/ingest/engine_mgr.go @@ -18,7 +18,6 @@ import ( "fmt" "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" @@ -38,7 +37,7 @@ func (m *engineManager) init(memRoot MemRoot, diskRoot DiskRoot) { } // Register create a new engineInfo and register it to the engineManager. -func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int64) (*engineInfo, error) { +func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schemaName, tableName string) (*engineInfo, error) { // Calculate lightning concurrency degree and set memory usage // and pre-allocate memory usage for worker. m.MemRoot.RefreshConsumption() @@ -56,22 +55,22 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int return nil, genEngineAllocMemFailedErr(m.MemRoot, bc.jobID, indexID) } - cfg := generateLocalEngineConfig(job.ID, job.SchemaName, job.TableName) - openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID)) + cfg := generateLocalEngineConfig(jobID, schemaName, tableName) + openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, tableName, int32(indexID)) if err != nil { - logutil.BgLogger().Warn(LitErrCreateEngineFail, zap.Int64("job ID", job.ID), + logutil.BgLogger().Warn(LitErrCreateEngineFail, zap.Int64("job ID", jobID), zap.Int64("index ID", indexID), zap.Error(err)) return nil, errors.Trace(err) } id := openedEn.GetEngineUUID() - en = NewEngineInfo(bc.ctx, job.ID, indexID, cfg, openedEn, id, 1, m.MemRoot, m.DiskRoot) + en = NewEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, 1, m.MemRoot, m.DiskRoot) m.Store(indexID, en) m.MemRoot.Consume(StructSizeEngineInfo) - m.MemRoot.ConsumeWithTag(encodeEngineTag(job.ID, indexID), engineCacheSize) + m.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize) info = LitInfoOpenEngine } else { if en.writerCount+1 > bc.cfg.TikvImporter.RangeConcurrency { - logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.Int64("job ID", job.ID), + logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.Int64("job ID", jobID), zap.Int64("index ID", indexID), zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency)) return nil, dbterror.ErrIngestFailed.FastGenByArgs("concurrency quota exceeded") @@ -79,8 +78,8 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int en.writerCount++ info = LitInfoAddWriter } - m.MemRoot.ConsumeWithTag(encodeEngineTag(job.ID, indexID), int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) - logutil.BgLogger().Info(info, zap.Int64("job ID", job.ID), + m.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) + logutil.BgLogger().Info(info, zap.Int64("job ID", jobID), zap.Int64("index ID", indexID), zap.Int64("current memory usage", m.MemRoot.CurrentUsage()), zap.Int64("memory limitation", m.MemRoot.MaxMemoryQuota()), diff --git a/ddl/job_table.go b/ddl/job_table.go index a6e19b7f7edf0..d15285ea2da99 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -567,7 +567,7 @@ func GetBackfillJobsForOneEle(sess *session, batch int, excludedJobIDs []int64, var bJobs []*BackfillJob s := newSession(sess) err = runInTxn(s, func(se *session) error { - currTime, err := GetOracleTime(s) + currTime, err := GetOracleTimeWithTxn(s) if err != nil { return err } @@ -600,7 +600,7 @@ func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, jobID int64, uuid var bJobs []*BackfillJob s := newSession(sess) err := runInTxn(s, func(se *session) error { - currTime, err := GetOracleTime(se) + currTime, err := GetOracleTimeWithTxn(se) if err != nil { return err } @@ -662,6 +662,21 @@ func GetBackfillJobCount(sess *session, tblName, condition string, label string) return int(rows[0].GetInt64(0)), nil } +func getUnsyncedInstanceIDs(sess *session, jobID int64, label string) ([]string, error) { + sql := fmt.Sprintf("select sum(state = %d) as tmp, exec_id from mysql.tidb_ddl_backfill_history where ddl_job_id = %d group by exec_id having tmp = 0;", + model.JobStateSynced, jobID) + rows, err := sess.execute(context.Background(), sql, label) + if err != nil { + return nil, errors.Trace(err) + } + InstanceIDs := make([]string, 0, len(rows)) + for _, row := range rows { + InstanceID := row.GetString(1) + InstanceIDs = append(InstanceIDs, InstanceID) + } + return InstanceIDs, nil +} + // GetBackfillJobs gets the backfill jobs in the tblName table according to condition. func GetBackfillJobs(sess *session, tblName, condition string, label string) ([]*BackfillJob, error) { rows, err := sess.execute(context.Background(), fmt.Sprintf("select * from mysql.%s where %s", tblName, condition), label) diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index ca30cf903107d..ccac8705ad4b5 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -293,10 +293,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { expectJob = bjTestCases[5] } require.Equal(t, expectJob, bJobs[0]) - previousTime, err := ddl.GetOracleTime(se) + previousTime, err := ddl.GetOracleTimeWithTxn(se) require.EqualError(t, err, "[kv:8024]invalid transaction") readInTxn(se, func(sessionctx.Context) { - previousTime, err = ddl.GetOracleTime(se) + previousTime, err = ddl.GetOracleTimeWithTxn(se) require.NoError(t, err) }) @@ -311,7 +311,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { equalBackfillJob(t, expectJob, bJobs[0], ddl.GetLeaseGoTime(previousTime, instanceLease)) var currTime time.Time readInTxn(se, func(sessionctx.Context) { - currTime, err = ddl.GetOracleTime(se) + currTime, err = ddl.GetOracleTimeWithTxn(se) require.NoError(t, err) }) currGoTime := ddl.GetLeaseGoTime(currTime, instanceLease) @@ -365,7 +365,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // ------------------------ // 0 jobID2 eleID2 readInTxn(se, func(sessionctx.Context) { - currTime, err = ddl.GetOracleTime(se) + currTime, err = ddl.GetOracleTimeWithTxn(se) require.NoError(t, err) }) condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d order by ddl_job_id", currTime.Add(-instanceLease), jobID1) diff --git a/ddl/partition.go b/ddl/partition.go index 0a1ea4e6fbe66..3e5ff3eb71648 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1757,7 +1757,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( } } rh := newReorgHandler(t, w.sess, w.concurrentDDL) - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, physicalTableIDs, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version diff --git a/ddl/reorg.go b/ddl/reorg.go index d7671031f64d1..ae561696c4afe 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -197,7 +197,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo } } - rc := w.getReorgCtx(job) + rc := w.getReorgCtx(job.ID) if rc == nil { // This job is cancelling, we should return ErrCancelledDDLJob directly. // Q: Is there any possibility that the job is cancelling and has no reorgCtx? @@ -291,7 +291,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo } func (w *worker) mergeWarningsIntoJob(job *model.Job) { - rc := w.getReorgCtx(job) + rc := w.getReorgCtx(job.ID) rc.mu.Lock() defer rc.mu.Unlock() partWarnings := rc.mu.warnings @@ -354,13 +354,13 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 { return rows[0].GetInt64(0) } -func (dc *ddlCtx) isReorgRunnable(job *model.Job) error { +func (dc *ddlCtx) isReorgRunnable(jobID int64) error { if isChanClosed(dc.ctx.Done()) { // Worker is closed. So it can't do the reorganization. return dbterror.ErrInvalidWorker.GenWithStack("worker is closed") } - if dc.getReorgCtx(job).isReorgCanceled() { + if dc.getReorgCtx(jobID).isReorgCanceled() { // Job is cancelled. So it can't be done. return dbterror.ErrCancelledDDLJob } diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 7bb9eaef01e6f..f89901335031e 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -419,14 +419,18 @@ type JobMeta struct { // BackfillMeta is meta info of the backfill job. type BackfillMeta struct { - EndInclude bool `json:"end_include"` - ErrMsg string `json:"err_msg"` + PhysicalTableID int64 `json:"physical_table_id"` + IsUnique bool `json:"is_unique"` + EndInclude bool `json:"end_include"` + ErrMsg string `json:"err_msg"` SQLMode mysql.SQLMode `json:"sql_mode"` Warnings map[errors.ErrorID]*terror.Error `json:"warnings"` WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"` Location *TimeZoneLocation `json:"location"` - *JobMeta `json:"job_meta"` + ReorgTp ReorgType `json:"reorg_tp"` + + *JobMeta `json:"job_meta"` } // Encode encodes BackfillMeta with json format. From 30e73c4740bb1d718eeeaa9c50bf0148e8159410 Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 16 Dec 2022 12:07:31 +0800 Subject: [PATCH 2/8] ddl: add tests --- ddl/backfilling.go | 177 +++++++++++++++++------------------ ddl/column.go | 10 +- ddl/ddl_workerpool_test.go | 4 +- ddl/index.go | 19 ++-- ddl/index_merge_tmp.go | 10 +- ddl/job_table_test.go | 184 +++++++++++++++++++++++++++++++------ ddl/util/util.go | 5 + executor/ddl_test.go | 21 +++++ 8 files changed, 290 insertions(+), 140 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index e3afbb64cc46f..b2d49b2769e16 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -62,8 +62,8 @@ const ( // InstanceLease is the instance lease. InstanceLease = 1 * time.Minute - updateInstanceLease = 40 * time.Second - genTaskBatch = 8192 + updateInstanceLease = 25 * time.Second + genTaskBatch = 4096 minGenTaskBatch = 1024 minDistTaskCnt = 16 retrySQLTimes = 3 @@ -308,7 +308,7 @@ func newBackfillWorker(ctx context.Context, id int, bf backfiller) *backfillWork } } -func (w *backfillWorker) updateLease(exec_id string, bJob *BackfillJob) error { +func (w *backfillWorker) updateLease(execID string, bJob *BackfillJob) error { isDistReorg, err := w.GetCtx().sessCtx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBDDLEnableDistributeReorg) if err != nil { return err @@ -321,7 +321,7 @@ func (w *backfillWorker) updateLease(exec_id string, bJob *BackfillJob) error { if err != nil { return err } - bJob.InstanceID = exec_id + bJob.InstanceID = execID bJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease) return w.backfiller.UpdateTask(bJob) } @@ -634,8 +634,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount return nil } -func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange) []*reorgBackfillTask { - batchTasks := make([]*reorgBackfillTask, 0, backfillTaskChanSize) +func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask { + batchTasks := make([]*reorgBackfillTask, 0, batch) physicalTableID := reorgInfo.PhysicalTableID var prefix kv.Key if tbl, ok := t.(table.PartitionedTable); ok { @@ -648,7 +648,6 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange) } // Build reorg tasks. job := reorgInfo.Job - pt := t.(table.PhysicalTable) jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID) for i, keyRange := range kvRanges { startKey := keyRange.StartKey @@ -668,11 +667,13 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange) endKey = prefix.PrefixNext() } + //nolint:forcetypeassert + phyTbl := t.(table.PhysicalTable) task := &reorgBackfillTask{ id: i, jobID: reorgInfo.Job.ID, physicalTableID: physicalTableID, - physicalTable: pt, + physicalTable: phyTbl, priority: reorgInfo.Priority, startKey: startKey, endKey: endKey, @@ -680,7 +681,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange) endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1} batchTasks = append(batchTasks, task) - if len(batchTasks) >= backfillTaskChanSize { + if len(batchTasks) >= batch { break } } @@ -690,7 +691,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange) // handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled. func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { - batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges) + batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize) if len(batchTasks) == 0 { return nil, nil } @@ -729,6 +730,16 @@ func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error { return ddlutil.LoadDDLReorgVars(ctx, sCtx) } +func loadDDLDistributeVars(ctx context.Context, sessPool *sessionPool) error { + // Get sessionctx from context resource pool. + sCtx, err := sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer sessPool.put(sCtx) + return ddlutil.LoadDDLDistributeVars(ctx, sCtx) +} + func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) { writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols())) for _, col := range t.WritableCols() { @@ -872,11 +883,11 @@ func (b *backfillScheduler) adjustWorkerSize() error { case typeUpdateColumnWorker: // Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting. sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true - updateWorker := newUpdateColumnWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) + updateWorker := newUpdateColumnWorker(sessCtx, b.tbl, b.decodeColMap, reorgInfo, jc) runner = newBackfillWorker(jc.ddlJobCtx, i, updateWorker) worker = updateWorker case typeCleanUpIndexWorker: - idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) + idxWorker := newCleanUpIndexWorker(sessCtx, b.tbl, b.decodeColMap, reorgInfo, jc) runner = newBackfillWorker(jc.ddlJobCtx, i, idxWorker) worker = idxWorker default: @@ -1108,24 +1119,22 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo return nil } -func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbls []table.PhysicalTable, isUnique bool, +func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool, bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error { endKey := reorgInfo.EndKey isFirstOps := true bJobs := make([]*BackfillJob, 0, genTaskBatch) - batch := genTaskBatch - t := pTbls[0] for { - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey) + kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey) if err != nil { return errors.Trace(err) } - if batch > len(kvRanges) { - batch = len(kvRanges) + batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch) + if len(batchTasks) == 0 { + break } - batchTasks := getBatchTasks(t, reorgInfo, kvRanges) notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) - if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs[:0], isUnique, &currBackfillJobID); err != nil { + if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil { return errors.Trace(err) } isFirstOps = false @@ -1144,17 +1153,14 @@ func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, } for { - isFinished, bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) if err != nil { return errors.Trace(err) } - if isFinished { - return nil - } if bJobCnt < minGenTaskBatch { break } - time.Sleep(time.Second) + time.Sleep(retrySQLInterval) } startKey = remains[0].StartKey } @@ -1172,12 +1178,13 @@ func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.Physica } currBackfillJobID := int64(1) - isFinished, maxBfJob, err := getMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) if err != nil { return errors.Trace(err) } - if isFinished { - return nil + maxBfJob, err := GetMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) } if maxBfJob != nil { startKey = maxBfJob.EndKey @@ -1189,12 +1196,13 @@ func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.Physica idxInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) isUnique = idxInfo.Unique } - err = dc.splitTableToBackfillJobs(sess, reorgInfo, []table.PhysicalTable{t}, isUnique, bfWorkerType, startKey, currBackfillJobID) + err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID) if err != nil { return errors.Trace(err) } var backfillJobFinished bool + jobID := reorgInfo.Job.ID ticker := time.NewTicker(300 * time.Millisecond) defer ticker.Stop() for { @@ -1204,39 +1212,37 @@ func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.Physica select { case <-ticker.C: - isFinished, err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - logutil.BgLogger().Warn("[ddl] checkAndHandleInterruptedBackfillJobs failed", zap.Error(err)) - return errors.Trace(err) - } - if isFinished { - logutil.BgLogger().Info("[ddl] finish interrupted backfill jobs") - return nil - } - if !backfillJobFinished { - bJob, err := getBackfillJobWithRetry(sess, BackfillTable, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) + err := checkAndHandleInterruptedBackfillJobs(sess, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) if err != nil { - logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Error(err)) + logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + + bJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) + if err != nil { + logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", jobID), zap.Error(err)) return errors.Trace(err) } if bJob == nil { backfillJobFinished = true - logutil.BgLogger().Info("[ddl] finish backfill jobs") + logutil.BgLogger().Info("[ddl] finish backfill jobs", zap.Int64("job ID", jobID)) } - } else { - isSynced, err := checkJobIsSynced(sess, reorgInfo.Job.ID) + } + if backfillJobFinished { + // TODO: Consider whether these backfill jobs are always out of sync. + isSynced, err := checkJobIsSynced(sess, jobID) if err != nil { - logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Error(err)) + logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Int64("job ID", jobID), zap.Error(err)) return errors.Trace(err) } if isSynced { - logutil.BgLogger().Info("[ddl] sync backfill jobs") + logutil.BgLogger().Info("[ddl] sync backfill jobs", zap.Int64("job ID", jobID)) return nil } } case <-dc.ctx.Done(): - return dbterror.ErrInvalidWorker.GenWithStack("worker is closed") + return dc.ctx.Err() } } } @@ -1260,7 +1266,7 @@ func checkJobIsSynced(sess *session, jobID int64) (bool, error) { return false, errors.Trace(err) } -func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (isFinished bool, err error) { +func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (err error) { var bJobs []*BackfillJob for i := 0; i < retrySQLTimes; i++ { bJobs, err = GetInterruptedBackfillJobsForOneEle(sess, jobID, currEleID, currEleKey) @@ -1271,49 +1277,47 @@ func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64 time.Sleep(retrySQLInterval) } if err != nil { - return false, errors.Trace(err) + return errors.Trace(err) } if len(bJobs) == 0 { - return false, nil + return nil } for i := 0; i < retrySQLTimes; i++ { - err = finishFailedBackfillJobs(sess, bJobs[0]) + err = MoveBackfillJobsToHistoryTable(sess, bJobs[0]) if err == nil { - return true, errors.Errorf(bJobs[0].Meta.ErrMsg) + return errors.Errorf(bJobs[0].Meta.ErrMsg) } - logutil.BgLogger().Info("[ddl] finishFailedBackfillJobs failed", zap.Error(err)) + logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err)) time.Sleep(retrySQLInterval) } - return false, errors.Trace(err) + return errors.Trace(err) } -func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (isFinished bool, backfillJobCnt int, err error) { - isFinished, err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) +func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { + err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) if err != nil { - return false, 0, errors.Trace(err) - } - if isFinished { - return true, 0, nil + return 0, errors.Trace(err) } backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", jobID, currEleID, currEleKey), "check_backfill_job_count") if err != nil { - return false, 0, errors.Trace(err) + return 0, errors.Trace(err) } - return false, backfillJobCnt, nil + return backfillJobCnt, nil } -func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isAscend bool) (bJob *BackfillJob, err error) { +func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) { + var err error var bJobs []*BackfillJob descStr := "" - if !isAscend { - descStr = "desc" + if isDesc { + descStr = "order by id desc" } for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' order by ddl_job_id, ele_id, ele_key, id %s limit 1", + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' %s limit 1", jobID, currEleID, currEleKey, descStr), "check_backfill_job_state") if err != nil { logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) @@ -1322,51 +1326,44 @@ func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID i if len(bJobs) != 0 { return bJobs[0], nil - } else { - return nil, nil } + break } return nil, errors.Trace(err) } -func getMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (bool, *BackfillJob, error) { - isFinished, err := checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) +// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. +func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { + bJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true) if err != nil { - return false, nil, errors.Trace(err) - } - if isFinished { - return true, nil, nil - } - - bJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, false) - if err != nil { - return false, nil, errors.Trace(err) + return nil, errors.Trace(err) } - hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, false) + hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, true) if err != nil { - return false, nil, errors.Trace(err) + return nil, errors.Trace(err) } if bJob == nil { - return false, hJob, nil + return hJob, nil } if hJob == nil { - return false, bJob, nil + return bJob, nil } if bJob.ID > hJob.ID { - return false, bJob, nil + return bJob, nil } - return false, hJob, nil + return hJob, nil } -func finishFailedBackfillJobs(sessCtx sessionctx.Context, bJob *BackfillJob) error { +// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. +func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bJob *BackfillJob) error { sess, ok := sessCtx.(*session) if !ok { return errors.Errorf("sess ctx:%#v convert session failed", sessCtx) } return runInTxn(sess, func(se *session) error { - // TODO: Batch by batch update backfill jobs and insert backfill history jobs. + // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. bJobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", bJob.JobID, bJob.EleID, bJob.EleKey), "update_backfill_job") if err != nil { @@ -1376,10 +1373,14 @@ func finishFailedBackfillJobs(sessCtx sessionctx.Context, bJob *BackfillJob) err return nil } - err = RemoveBackfillJob(sess, true, nil) + err = RemoveBackfillJob(sess, true, bJobs[0]) if err == nil { + for _, bj := range bJobs { + bj.State = model.JobStateCancelled + } err = AddBackfillHistoryJob(sess, bJobs) } + logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs))) return errors.Trace(err) }) } diff --git a/ddl/column.go b/ddl/column.go index 7dcc32c00755f..95c150a529495 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1171,7 +1171,7 @@ type updateColumnWorker struct { jobContext *JobContext } -func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { +func newUpdateColumnWorker(sessCtx sessionctx.Context, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { logutil.BgLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query), zap.String("reorgInfo", reorgInfo.String())) @@ -1201,19 +1201,19 @@ func (w *updateColumnWorker) AddMetricInfo(cnt float64) { w.metricCounter.Add(cnt) } -func (w *updateColumnWorker) String() string { +func (*updateColumnWorker) String() string { return typeUpdateColumnWorker.String() } -func (w *updateColumnWorker) GetTask() (*BackfillJob, error) { +func (*updateColumnWorker) GetTask() (*BackfillJob, error) { panic("[ddl] update column worker GetTask function doesn't implement") } -func (w *updateColumnWorker) UpdateTask(job *BackfillJob) error { +func (*updateColumnWorker) UpdateTask(*BackfillJob) error { panic("[ddl] update column worker UpdateTask function doesn't implement") } -func (w *updateColumnWorker) FinishTask(job *BackfillJob) error { +func (*updateColumnWorker) FinishTask(*BackfillJob) error { panic("[ddl] update column worker FinishTask function doesn't implement") } diff --git a/ddl/ddl_workerpool_test.go b/ddl/ddl_workerpool_test.go index e9f324ce9dff8..ccee23b12716f 100644 --- a/ddl/ddl_workerpool_test.go +++ b/ddl/ddl_workerpool_test.go @@ -19,7 +19,6 @@ import ( "testing" "github.com/ngaut/pools" - "github.com/pingcap/tidb/parser/model" "github.com/stretchr/testify/require" ) @@ -36,10 +35,9 @@ func TestDDLWorkerPool(t *testing.T) { } func TestBackfillWorkerPool(t *testing.T) { - reorgInfo := &reorgInfo{Job: &model.Job{ID: 1}} f := func() func() (pools.Resource, error) { return func() (pools.Resource, error) { - wk := newBackfillWorker(context.Background(), nil, 1, nil, reorgInfo, typeAddIndexWorker) + wk := newBackfillWorker(context.Background(), 1, nil) return wk, nil } } diff --git a/ddl/index.go b/ddl/index.go index 158c949451abb..fdc3d8a097e66 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1252,7 +1252,7 @@ func (w *baseIndexWorker) AddMetricInfo(cnt float64) { w.metricCounter.Add(cnt) } -func (w *baseIndexWorker) GetTask() (*BackfillJob, error) { +func (*baseIndexWorker) GetTask() (*BackfillJob, error) { return nil, nil } @@ -1672,21 +1672,20 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { } } } else { + //nolint:forcetypeassert + phyTbl := t.(table.PhysicalTable) // TODO: Support typeAddIndexMergeTmpWorker and partitionTable. - isDistReorg, err := w.sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBDDLEnableDistributeReorg) - if err != nil { - return err - } - if isDistReorg == variable.On && !reorgInfo.mergingTmpIdx { + loadDDLDistributeVars(w.ctx, w.sessPool) + isDistReorg := variable.DDLEnableDistributeReorg.Load() + if isDistReorg && !reorgInfo.mergingTmpIdx { sCtx, err := w.sessPool.get() if err != nil { return errors.Trace(err) } defer w.sessPool.put(sCtx) - return w.controlWritePhysicalTableRecord(newSession(sCtx), t.(table.PhysicalTable), typeAddIndexWorker, reorgInfo) + return w.controlWritePhysicalTableRecord(newSession(sCtx), phyTbl, typeAddIndexWorker, reorgInfo) } - //nolint:forcetypeassert - err = w.addPhysicalTableIndex(t.(table.PhysicalTable), reorgInfo) + err = w.addPhysicalTableIndex(phyTbl, reorgInfo) } return errors.Trace(err) } @@ -1800,7 +1799,7 @@ type cleanUpIndexWorker struct { baseIndexWorker } -func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *cleanUpIndexWorker { +func newCleanUpIndexWorker(sessCtx sessionctx.Context, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *cleanUpIndexWorker { indexes := make([]table.Index, 0, len(t.Indices())) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) for _, index := range t.Indices() { diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 7d4bc8b5e23fa..737ed84d33872 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -162,22 +162,22 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC return } -func (w *mergeIndexWorker) AddMetricInfo(cnt float64) { +func (*mergeIndexWorker) AddMetricInfo(float64) { } -func (w *mergeIndexWorker) String() string { +func (*mergeIndexWorker) String() string { return typeAddIndexMergeTmpWorker.String() } -func (w *mergeIndexWorker) GetTask() (*BackfillJob, error) { +func (*mergeIndexWorker) GetTask() (*BackfillJob, error) { panic("[ddl] merge index worker GetTask function doesn't implement") } -func (w *mergeIndexWorker) UpdateTask(job *BackfillJob) error { +func (*mergeIndexWorker) UpdateTask(*BackfillJob) error { panic("[ddl] merge index worker UpdateTask function doesn't implement") } -func (w *mergeIndexWorker) FinishTask(job *BackfillJob) error { +func (*mergeIndexWorker) FinishTask(*BackfillJob) error { panic("[ddl] merge index worker FinishTask function doesn't implement") } diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index ccac8705ad4b5..5cd92a0f48e20 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -248,11 +248,13 @@ func TestSimpleExecBackfillJobs(t *testing.T) { d := dom.DDL() se := ddl.NewSession(tk.Session()) - jobID1 := int64(2) - jobID2 := int64(3) - eleID1 := int64(4) - eleID2 := int64(5) + jobID1 := int64(1) + jobID2 := int64(2) + eleID1 := int64(11) + eleID2 := int64(22) + eleID3 := int64(33) uuid := d.GetID() + eleKey := meta.IndexElementKey instanceLease := ddl.InstanceLease // test no backfill job bJobs, err := ddl.GetBackfillJobsForOneEle(se, 1, []int64{jobID1, jobID2}, instanceLease) @@ -267,10 +269,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.Equal(t, allCnt, 0) // Test some backfill jobs, add backfill jobs to the table. cnt := 2 - bjTestCases := make([]*ddl.BackfillJob, 0, cnt*2) + bjTestCases := make([]*ddl.BackfillJob, 0, cnt*3) bJobs1 := makeAddIdxBackfillJobs(1, 2, jobID1, eleID1, cnt, "alter table add index idx(a)") bJobs2 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID2, cnt, "alter table add index idx(b)") - bJobs3 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID1, cnt, "alter table add index idx(c)") + bJobs3 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID3, cnt, "alter table add index idx(c)") bjTestCases = append(bjTestCases, bJobs1...) bjTestCases = append(bjTestCases, bJobs2...) bjTestCases = append(bjTestCases, bJobs3...) @@ -281,16 +283,16 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID1 eleID1 "" // 0 jobID2 eleID2 "" // 1 jobID2 eleID2 "" - // 0 jobID2 eleID1 "" - // 1 jobID2 eleID1 "" + // 0 jobID2 eleID3 "" + // 1 jobID2 eleID3 "" require.NoError(t, err) // test get some backfill jobs bJobs, err = ddl.GetBackfillJobsForOneEle(se, 1, []int64{jobID2 - 1, jobID2 + 1}, instanceLease) require.NoError(t, err) require.Len(t, bJobs, 1) - expectJob := bjTestCases[4] + expectJob := bjTestCases[2] if expectJob.ID != bJobs[0].ID { - expectJob = bjTestCases[5] + expectJob = bjTestCases[3] } require.Equal(t, expectJob, bJobs[0]) previousTime, err := ddl.GetOracleTimeWithTxn(se) @@ -303,9 +305,9 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, instanceLease) require.NoError(t, err) require.Len(t, bJobs, 1) - expectJob = bjTestCases[4] + expectJob = bjTestCases[2] if expectJob.ID != bJobs[0].ID { - expectJob = bjTestCases[5] + expectJob = bjTestCases[3] } expectJob.InstanceID = uuid equalBackfillJob(t, expectJob, bJobs[0], ddl.GetLeaseGoTime(previousTime, instanceLease)) @@ -327,8 +329,8 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID1 eleID1 // 0 jobID2 eleID2 // 1 jobID2 eleID2 - // 0 jobID2 eleID1 - // 1 jobID2 eleID1 + // 0 jobID2 eleID3 + // 1 jobID2 eleID3 require.NoError(t, err) allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) @@ -341,8 +343,8 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // ID jobID eleID // ------------------------ // 1 jobID1 eleID1 - // 0 jobID2 eleID1 - // 1 jobID2 eleID1 + // 0 jobID2 eleID3 + // 1 jobID2 eleID3 require.NoError(t, err) allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) @@ -368,25 +370,33 @@ func TestSimpleExecBackfillJobs(t *testing.T) { currTime, err = ddl.GetOracleTimeWithTxn(se) require.NoError(t, err) }) - condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d order by ddl_job_id", currTime.Add(-instanceLease), jobID1) + condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d order by ddl_job_id", currTime.Add(-instanceLease), jobID2) bJobs, err = ddl.GetBackfillJobs(se, ddl.BackfillHistoryTable, condition, "test_get_bj") require.NoError(t, err) require.Len(t, bJobs, 1) require.Greater(t, bJobs[0].FinishTS, uint64(0)) - // test GetInterruptedBackfillJobsForOneEle - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + // test GetMaxBackfillJob and GetInterruptedBackfillJobsForOneEle + bjob, err := ddl.GetMaxBackfillJob(se, bJobs3[0].JobID, bJobs3[0].EleID, eleKey) + require.NoError(t, err) + require.Nil(t, bjob) + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Nil(t, bJobs) + err = ddl.AddBackfillJobs(se, bjTestCases) + require.NoError(t, err) // ID jobID eleID // ------------------------ // 0 jobID1 eleID1 // 1 jobID1 eleID1 // 0 jobID2 eleID2 // 1 jobID2 eleID2 - err = ddl.AddBackfillJobs(se, bjTestCases) + // 0 jobID2 eleID3 + // 1 jobID2 eleID3 + bjob, err = ddl.GetMaxBackfillJob(se, jobID2, eleID2, eleKey) require.NoError(t, err) - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + require.Equal(t, bJobs2[1], bjob) + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Nil(t, bJobs) bJobs1[0].State = model.JobStateRollingback @@ -394,6 +404,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs1[0].InstanceID = uuid bJobs1[1].State = model.JobStateCancelling bJobs1[1].ID = 3 + bJobs1[1].Meta.ErrMsg = "errMsg" err = ddl.AddBackfillJobs(se, bJobs1) require.NoError(t, err) // ID jobID eleID state @@ -402,19 +413,134 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID1 eleID1 JobStateNone // 0 jobID2 eleID2 JobStateNone // 1 jobID2 eleID2 JobStateNone - // 0 jobID2 eleID1 JobStateNone - // 1 jobID2 eleID1 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone // 2 jobID1 eleID1 JobStateRollingback // 3 jobID1 eleID1 JobStateCancelling - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + require.NoError(t, err) + require.Equal(t, bJobs1[1], bjob) + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Len(t, bJobs, 2) equalBackfillJob(t, bJobs1[0], bJobs[0], types.ZeroTime) equalBackfillJob(t, bJobs1[1], bJobs[1], types.ZeroTime) - // test the BackfillJob's AbbrStr - require.Equal(t, fmt.Sprintf("ID:2, JobID:2, EleID:4, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr()) - require.Equal(t, "ID:3, JobID:2, EleID:4, Type:add index, State:cancelling, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) - require.Equal(t, "ID:0, JobID:3, EleID:5, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs2[0].AbbrStr()) - require.Equal(t, "ID:1, JobID:3, EleID:5, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs2[1].AbbrStr()) + require.Equal(t, fmt.Sprintf("ID:2, JobID:1, EleID:11, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr()) + require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelling, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) + require.Equal(t, "ID:0, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[0].AbbrStr()) + require.Equal(t, "ID:1, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[1].AbbrStr()) + + bJobs1[0].State = model.JobStateNone + bJobs1[0].ID = 5 + bJobs1[1].State = model.JobStateNone + bJobs1[1].ID = 4 + err = ddl.AddBackfillHistoryJob(se, bJobs1) + // BackfillTable + // ID jobID eleID state + // -------------------------------- + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + // + // BackfillHistoryTable + // ID jobID eleID state + // -------------------------------- + // 5 jobID1 eleID1 JobStateNone + // 4 jobID1 eleID1 JobStateNone + bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + require.NoError(t, err) + require.Equal(t, bJobs1[0], bjob) + bJobs1[0].ID = 6 + bJobs1[1].ID = 7 + err = ddl.AddBackfillJobs(se, bJobs1) + // BackfillTable + // ID jobID eleID state + // -------------------------------- + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + // 6 jobID1 eleID1 JobStateNone + // 7 jobID1 eleID1 JobStateNone + // + // BackfillHistoryTable + // ID jobID eleID state + // -------------------------------- + // 5 jobID1 eleID1 JobStateNone + // 4 jobID1 eleID1 JobStateNone + bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + require.NoError(t, err) + require.Equal(t, bJobs1[1], bjob) + + // test MoveBackfillJobsToHistoryTable + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 2) + err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs3[0]) + require.NoError(t, err) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 0) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 2) + // BackfillTable + // ID jobID eleID state + // -------------------------------- + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + // 6 jobID1 eleID1 JobStateNone + // 7 jobID1 eleID1 JobStateNone + // + // BackfillHistoryTable + // ID jobID eleID state + // -------------------------------- + // 5 jobID1 eleID1 JobStateNone + // 4 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 6) + err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs1[0]) + require.NoError(t, err) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 0) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 8) + // BackfillTable + // ID jobID eleID state + // -------------------------------- + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // + // BackfillHistoryTable + // ID jobID eleID state + // -------------------------------- + // 5 jobID1 eleID1 JobStateNone + // 4 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + // 6 jobID1 eleID1 JobStateNone + // 7 jobID1 eleID1 JobStateNone } diff --git a/ddl/util/util.go b/ddl/util/util.go index 33ada320b1b14..7bb813ec8feab 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -172,6 +172,11 @@ func UpdateDeleteRange(sctx sessionctx.Context, dr DelRangeTask, newStartKey, ol return errors.Trace(err) } +// LoadDDLDistributeVars loads ddl distribute reorg variable from mysql.global_variables. +func LoadDDLDistributeVars(ctx context.Context, sctx sessionctx.Context) error { + return LoadGlobalVars(ctx, sctx, []string{variable.TiDBDDLEnableDistributeReorg}) +} + // LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables. func LoadDDLReorgVars(ctx context.Context, sctx sessionctx.Context) error { // close issue #21391 diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 6f4badaa475ed..5e6037556e198 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessionctx/variable/featuretag/distributereorg" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/table" @@ -1306,6 +1307,26 @@ func TestSetDDLErrorCountLimit(t *testing.T) { res.Check(testkit.Rows("100")) } +func TestLoadDDLDistributeVars(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + err := ddlutil.LoadDDLDistributeVars(context.Background(), tk.Session()) + require.NoError(t, err) + require.Equal(t, variable.DefTiDBDDLEnableDistributeReorg, distributereorg.TiDBEnableDistributeReorg) + + tk.MustGetDBError("set @@global.tidb_ddl_distribute_reorg = invalid_val", variable.ErrWrongValueForVar) + require.Equal(t, distributereorg.TiDBEnableDistributeReorg, variable.DDLEnableDistributeReorg.Load()) + tk.MustExec("set @@global.tidb_ddl_distribute_reorg = 'on'") + err = ddlutil.LoadDDLDistributeVars(context.Background(), tk.Session()) + require.NoError(t, err) + require.Equal(t, true, variable.DDLEnableDistributeReorg.Load()) + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_distribute_reorg = %v", distributereorg.TiDBEnableDistributeReorg)) + err = ddlutil.LoadDDLDistributeVars(context.Background(), tk.Session()) + require.NoError(t, err) + require.Equal(t, distributereorg.TiDBEnableDistributeReorg, variable.DDLEnableDistributeReorg.Load()) +} + // Test issue #9205, fix the precision problem for time type default values // See https://github.com/pingcap/tidb/issues/9205 for details func TestIssue9205(t *testing.T) { From ca973b33ab3230ca33b28a95fb28f15557d4b84a Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 19 Dec 2022 14:38:24 +0800 Subject: [PATCH 3/8] ddl: tiny update --- ddl/index.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index fdc3d8a097e66..d4b258def6d43 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1675,7 +1675,9 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { //nolint:forcetypeassert phyTbl := t.(table.PhysicalTable) // TODO: Support typeAddIndexMergeTmpWorker and partitionTable. - loadDDLDistributeVars(w.ctx, w.sessPool) + if err := loadDDLDistributeVars(w.ctx, w.sessPool); err != nil { + logutil.BgLogger().Error("[ddl] load DDL distribute variable failed", zap.Error(err)) + } isDistReorg := variable.DDLEnableDistributeReorg.Load() if isDistReorg && !reorgInfo.mergingTmpIdx { sCtx, err := w.sessPool.get() From 506a15663ff385ccaa22a3770f23555d09ef86ae Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 20 Dec 2022 17:37:00 +0800 Subject: [PATCH 4/8] ddl, executor: address comments and tiny update --- ddl/backfilling.go | 26 +++++--------------------- ddl/job_table.go | 4 ++-- ddl/job_table_test.go | 8 ++++---- executor/BUILD.bazel | 1 + 4 files changed, 12 insertions(+), 27 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 566fe0cbba9b0..06662aaf1719c 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -113,8 +113,8 @@ func (bj *BackfillJob) AbbrStr() string { bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease) } -// GetOracleTimeWithTxn returns the current time from TS with txn. -func GetOracleTimeWithTxn(se *session) (time.Time, error) { +// GetOracleTimeWithStartTS returns the current time with txn's startTS. +func GetOracleTimeWithStartTS(se *session) (time.Time, error) { txn, err := se.Txn(true) if err != nil { return time.Time{}, err @@ -308,33 +308,18 @@ func newBackfillWorker(ctx context.Context, id int, bf backfiller) *backfillWork } } -func (w *backfillWorker) updateLease(execID string, bJob *BackfillJob) error { - isDistReorg, err := w.GetCtx().sessCtx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBDDLEnableDistributeReorg) - if err != nil { - return err - } - if isDistReorg != variable.On { - return nil - } - +func (w *backfillWorker) updateLease(execID string, bJob *BackfillJob, nextKey kv.Key) error { leaseTime, err := GetOracleTime(w.GetCtx().store) if err != nil { return err } + bJob.CurrKey = nextKey bJob.InstanceID = execID bJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease) return w.backfiller.UpdateTask(bJob) } func (w *backfillWorker) finishJob(bJob *BackfillJob) error { - isDistReorg, err := w.GetCtx().sessCtx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBDDLEnableDistributeReorg) - if err != nil { - return err - } - if isDistReorg != variable.On { - return nil - } - bJob.State = model.JobStateDone return w.backfiller.FinishTask(bJob) } @@ -429,8 +414,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, continue } batchStartTime = time.Now() - task.bfJob.CurrKey = result.nextKey - if err := w.updateLease(w.GetCtx().uuid, task.bfJob); err != nil { + if err := w.updateLease(w.GetCtx().uuid, task.bfJob, result.nextKey); err != nil { logutil.BgLogger().Info("[ddl] backfill worker handle task, update lease failed", zap.Stringer("worker", w), zap.Stringer("task", task), zap.String("bj", task.bfJob.AbbrStr()), zap.Error(err)) result.err = err diff --git a/ddl/job_table.go b/ddl/job_table.go index d15285ea2da99..96a96e9c63fa1 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -567,7 +567,7 @@ func GetBackfillJobsForOneEle(sess *session, batch int, excludedJobIDs []int64, var bJobs []*BackfillJob s := newSession(sess) err = runInTxn(s, func(se *session) error { - currTime, err := GetOracleTimeWithTxn(s) + currTime, err := GetOracleTimeWithStartTS(s) if err != nil { return err } @@ -600,7 +600,7 @@ func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, jobID int64, uuid var bJobs []*BackfillJob s := newSession(sess) err := runInTxn(s, func(se *session) error { - currTime, err := GetOracleTimeWithTxn(se) + currTime, err := GetOracleTimeWithStartTS(se) if err != nil { return err } diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 5cd92a0f48e20..5a9df95d24759 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -295,10 +295,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { expectJob = bjTestCases[3] } require.Equal(t, expectJob, bJobs[0]) - previousTime, err := ddl.GetOracleTimeWithTxn(se) + previousTime, err := ddl.GetOracleTimeWithStartTS(se) require.EqualError(t, err, "[kv:8024]invalid transaction") readInTxn(se, func(sessionctx.Context) { - previousTime, err = ddl.GetOracleTimeWithTxn(se) + previousTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) @@ -313,7 +313,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { equalBackfillJob(t, expectJob, bJobs[0], ddl.GetLeaseGoTime(previousTime, instanceLease)) var currTime time.Time readInTxn(se, func(sessionctx.Context) { - currTime, err = ddl.GetOracleTimeWithTxn(se) + currTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) currGoTime := ddl.GetLeaseGoTime(currTime, instanceLease) @@ -367,7 +367,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // ------------------------ // 0 jobID2 eleID2 readInTxn(se, func(sessionctx.Context) { - currTime, err = ddl.GetOracleTimeWithTxn(se) + currTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d order by ddl_job_id", currTime.Add(-instanceLease), jobID2) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 46ae254888db6..71e5419ba5e41 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -373,6 +373,7 @@ go_test( "//sessionctx/binloginfo", "//sessionctx/stmtctx", "//sessionctx/variable", + "//sessionctx/variable/featuretag/distributereorg", "//sessiontxn", "//sessiontxn/staleread", "//statistics", From 6c1103d59776a7865f3cf710303bc3a99667219a Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 26 Dec 2022 10:54:38 +0800 Subject: [PATCH 5/8] ddl: address comments --- ddl/backfilling.go | 40 ++++++++++++++++++++-------------------- ddl/index.go | 18 +++++++++--------- ddl/job_table.go | 10 +++++----- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 06662aaf1719c..eb16d503dc818 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -214,8 +214,8 @@ type backfiller interface { BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) AddMetricInfo(float64) GetTask() (*BackfillJob, error) - UpdateTask(bJob *BackfillJob) error - FinishTask(bJob *BackfillJob) error + UpdateTask(bfJob *BackfillJob) error + FinishTask(bfJob *BackfillJob) error GetCtx() *backfillCtx String() string } @@ -308,20 +308,20 @@ func newBackfillWorker(ctx context.Context, id int, bf backfiller) *backfillWork } } -func (w *backfillWorker) updateLease(execID string, bJob *BackfillJob, nextKey kv.Key) error { +func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey kv.Key) error { leaseTime, err := GetOracleTime(w.GetCtx().store) if err != nil { return err } - bJob.CurrKey = nextKey - bJob.InstanceID = execID - bJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease) - return w.backfiller.UpdateTask(bJob) + bfJob.CurrKey = nextKey + bfJob.InstanceID = execID + bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease) + return w.backfiller.UpdateTask(bfJob) } -func (w *backfillWorker) finishJob(bJob *BackfillJob) error { - bJob.State = model.JobStateDone - return w.backfiller.FinishTask(bJob) +func (w *backfillWorker) finishJob(bfJob *BackfillJob) error { + bfJob.State = model.JobStateDone + return w.backfiller.FinishTask(bfJob) } func (w *backfillWorker) String() string { @@ -480,7 +480,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { w.resultCh <- result if result.err != nil { logutil.BgLogger().Info("[ddl] backfill worker exit on error", - zap.Stringer("workerr", w), zap.Int("workerID", w.id), zap.Error(result.err)) + zap.Stringer("worker", w), zap.Int("workerID", w.id), zap.Error(result.err)) return } } @@ -1200,12 +1200,12 @@ func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.Physica return errors.Trace(err) } - bJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) if err != nil { logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", jobID), zap.Error(err)) return errors.Trace(err) } - if bJob == nil { + if bfJob == nil { backfillJobFinished = true logutil.BgLogger().Info("[ddl] finish backfill jobs", zap.Int64("job ID", jobID)) } @@ -1315,7 +1315,7 @@ func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID i // GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { - bJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true) + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true) if err != nil { return nil, errors.Trace(err) } @@ -1324,20 +1324,20 @@ func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) return nil, errors.Trace(err) } - if bJob == nil { + if bfJob == nil { return hJob, nil } if hJob == nil { - return bJob, nil + return bfJob, nil } - if bJob.ID > hJob.ID { - return bJob, nil + if bfJob.ID > hJob.ID { + return bfJob, nil } return hJob, nil } // MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. -func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bJob *BackfillJob) error { +func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bfJob *BackfillJob) error { sess, ok := sessCtx.(*session) if !ok { return errors.Errorf("sess ctx:%#v convert session failed", sessCtx) @@ -1346,7 +1346,7 @@ func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bJob *BackfillJo return runInTxn(sess, func(se *session) error { // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. bJobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - bJob.JobID, bJob.EleID, bJob.EleKey), "update_backfill_job") + bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job") if err != nil { return errors.Trace(err) } diff --git a/ddl/index.go b/ddl/index.go index d4b258def6d43..343871abf6456 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1260,7 +1260,7 @@ func (w *baseIndexWorker) String() string { return w.tp.String() } -func (w *baseIndexWorker) UpdateTask(bJob *BackfillJob) error { +func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { sess, ok := w.backfillCtx.sessCtx.(*session) if !ok { return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx) @@ -1268,32 +1268,32 @@ func (w *baseIndexWorker) UpdateTask(bJob *BackfillJob) error { return runInTxn(sess, func(se *session) error { jobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and id = %d and ele_key = '%s'", - bJob.JobID, bJob.EleID, bJob.ID, bJob.EleKey), "update_backfill_task") + bfJob.JobID, bfJob.EleID, bfJob.ID, bfJob.EleKey), "update_backfill_task") if err != nil { return err } if len(jobs) == 0 { - return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill bJob, lease is timeout") + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") } - if jobs[0].InstanceID != bJob.InstanceID { - return dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get a backfill bJob %v, want instance ID %s", jobs[0], bJob.InstanceID)) + if jobs[0].InstanceID != bfJob.InstanceID { + return dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get a backfill job %v, want instance ID %s", jobs[0], bfJob.InstanceID)) } - return updateBackfillJob(sess, BackfillTable, bJob, "update_backfill_task") + return updateBackfillJob(sess, BackfillTable, bfJob, "update_backfill_task") }) } -func (w *baseIndexWorker) FinishTask(bJob *BackfillJob) error { +func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error { sess, ok := w.backfillCtx.sessCtx.(*session) if !ok { return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx) } return runInTxn(sess, func(se *session) error { - err := RemoveBackfillJob(sess, false, bJob) + err := RemoveBackfillJob(sess, false, bfJob) if err != nil { return err } - return AddBackfillHistoryJob(sess, []*BackfillJob{bJob}) + return AddBackfillHistoryJob(sess, []*BackfillJob{bfJob}) }) } diff --git a/ddl/job_table.go b/ddl/job_table.go index 96a96e9c63fa1..d5739209bf38a 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -612,7 +612,7 @@ func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, jobID int64, uuid return err } if len(bJobs) == 0 { - return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job, lease is timeout") + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") } validLen = 0 @@ -685,7 +685,7 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] } bJobs := make([]*BackfillJob, 0, len(rows)) for _, row := range rows { - bJob := BackfillJob{ + bfJob := BackfillJob{ ID: row.GetInt64(0), JobID: row.GetInt64(1), EleID: row.GetInt64(2), @@ -702,12 +702,12 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] FinishTS: row.GetUint64(13), RowCount: row.GetInt64(14), } - bJob.Meta = &model.BackfillMeta{} - err = bJob.Meta.Decode(row.GetBytes(15)) + bfJob.Meta = &model.BackfillMeta{} + err = bfJob.Meta.Decode(row.GetBytes(15)) if err != nil { return nil, errors.Trace(err) } - bJobs = append(bJobs, &bJob) + bJobs = append(bJobs, &bfJob) } return bJobs, nil } From a59ca492ac92387317b95b9c3425c2eb2e325e7b Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 27 Dec 2022 16:16:08 +0800 Subject: [PATCH 6/8] ddl, executor: address comments and tiny update Please enter the commit message for your changes. Lines starting --- ddl/backfilling.go | 10 ---------- ddl/index.go | 14 ++++++++------ ddl/util/util.go | 5 ----- executor/ddl_test.go | 6 ------ 4 files changed, 8 insertions(+), 27 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index eb16d503dc818..66798676973ae 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -711,16 +711,6 @@ func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error { return ddlutil.LoadDDLReorgVars(ctx, sCtx) } -func loadDDLDistributeVars(ctx context.Context, sessPool *sessionPool) error { - // Get sessionctx from context resource pool. - sCtx, err := sessPool.get() - if err != nil { - return errors.Trace(err) - } - defer sessPool.put(sCtx) - return ddlutil.LoadDDLDistributeVars(ctx, sCtx) -} - func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) { writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols())) for _, col := range t.WritableCols() { diff --git a/ddl/index.go b/ddl/index.go index 343871abf6456..f531a16c00093 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1267,18 +1267,23 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { } return runInTxn(sess, func(se *session) error { - jobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and id = %d and ele_key = '%s'", - bfJob.JobID, bfJob.EleID, bfJob.ID, bfJob.EleKey), "update_backfill_task") + jobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' and id = %d", + bfJob.JobID, bfJob.EleID, bfJob.EleKey, bfJob.ID), "update_backfill_task") if err != nil { return err } - if len(jobs) == 0 { return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") } if jobs[0].InstanceID != bfJob.InstanceID { return dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get a backfill job %v, want instance ID %s", jobs[0], bfJob.InstanceID)) } + + currTime, err := GetOracleTimeWithStartTS(se) + if err != nil { + return err + } + bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease) return updateBackfillJob(sess, BackfillTable, bfJob, "update_backfill_task") }) } @@ -1675,9 +1680,6 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { //nolint:forcetypeassert phyTbl := t.(table.PhysicalTable) // TODO: Support typeAddIndexMergeTmpWorker and partitionTable. - if err := loadDDLDistributeVars(w.ctx, w.sessPool); err != nil { - logutil.BgLogger().Error("[ddl] load DDL distribute variable failed", zap.Error(err)) - } isDistReorg := variable.DDLEnableDistributeReorg.Load() if isDistReorg && !reorgInfo.mergingTmpIdx { sCtx, err := w.sessPool.get() diff --git a/ddl/util/util.go b/ddl/util/util.go index 7bb813ec8feab..33ada320b1b14 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -172,11 +172,6 @@ func UpdateDeleteRange(sctx sessionctx.Context, dr DelRangeTask, newStartKey, ol return errors.Trace(err) } -// LoadDDLDistributeVars loads ddl distribute reorg variable from mysql.global_variables. -func LoadDDLDistributeVars(ctx context.Context, sctx sessionctx.Context) error { - return LoadGlobalVars(ctx, sctx, []string{variable.TiDBDDLEnableDistributeReorg}) -} - // LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables. func LoadDDLReorgVars(ctx context.Context, sctx sessionctx.Context) error { // close issue #21391 diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 5e6037556e198..bb8775a013a30 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -1311,19 +1311,13 @@ func TestLoadDDLDistributeVars(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - err := ddlutil.LoadDDLDistributeVars(context.Background(), tk.Session()) - require.NoError(t, err) require.Equal(t, variable.DefTiDBDDLEnableDistributeReorg, distributereorg.TiDBEnableDistributeReorg) tk.MustGetDBError("set @@global.tidb_ddl_distribute_reorg = invalid_val", variable.ErrWrongValueForVar) require.Equal(t, distributereorg.TiDBEnableDistributeReorg, variable.DDLEnableDistributeReorg.Load()) tk.MustExec("set @@global.tidb_ddl_distribute_reorg = 'on'") - err = ddlutil.LoadDDLDistributeVars(context.Background(), tk.Session()) - require.NoError(t, err) require.Equal(t, true, variable.DDLEnableDistributeReorg.Load()) tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_distribute_reorg = %v", distributereorg.TiDBEnableDistributeReorg)) - err = ddlutil.LoadDDLDistributeVars(context.Background(), tk.Session()) - require.NoError(t, err) require.Equal(t, distributereorg.TiDBEnableDistributeReorg, variable.DDLEnableDistributeReorg.Load()) } From c41b136da59eb2e76be4861a4c1e40e69fbaf82d Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 30 Dec 2022 15:44:33 +0800 Subject: [PATCH 7/8] ddl: address comments and tiny update --- ddl/backfilling.go | 14 +++++---- ddl/index.go | 7 ++++- ddl/job_table.go | 67 +++++++++++++++++++++++-------------------- ddl/job_table_test.go | 4 +-- 4 files changed, 53 insertions(+), 39 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index dbd301fd6b9a1..5e9bbd069173d 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -1218,14 +1218,12 @@ func checkJobIsSynced(sess *session, jobID int64) (bool, error) { var unsyncedInstanceIDs []string for i := 0; i < retrySQLTimes; i++ { unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, jobID, "check_backfill_history_job_sync") - if err != nil { - return false, errors.Trace(err) - } - if len(unsyncedInstanceIDs) == 0 { + if err == nil && len(unsyncedInstanceIDs) == 0 { return true, nil } - logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", zap.Int("tryTimes", i), zap.Error(err)) + logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", + zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) time.Sleep(retrySQLInterval) } @@ -1339,10 +1337,16 @@ func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bfJob *BackfillJ return nil } + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + startTS := txn.StartTS() err = RemoveBackfillJob(sess, true, bJobs[0]) if err == nil { for _, bj := range bJobs { bj.State = model.JobStateCancelled + bj.FinishTS = startTS } err = AddBackfillHistoryJob(sess, bJobs) } diff --git a/ddl/index.go b/ddl/index.go index d90263aec8ea2..c8860face180a 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1305,7 +1305,12 @@ func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error { return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx) } return runInTxn(sess, func(se *session) error { - err := RemoveBackfillJob(sess, false, bfJob) + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + bfJob.FinishTS = txn.StartTS() + err = RemoveBackfillJob(sess, false, bfJob) if err != nil { return err } diff --git a/ddl/job_table.go b/ddl/job_table.go index 753218121d6db..5b760d5f509f3 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -496,51 +496,56 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) { return jobs, nil } -// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. -func AddBackfillJobs(sess *session, backfillJobs []*BackfillJob) error { - return addBackfillJobs(sess, BackfillTable, backfillJobs) +func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) (string, error) { + sqlPrefix := fmt.Sprintf("insert into mysql.%s(id, ddl_job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values", tableName) + var sql string + for i, bj := range backfillJobs { + mateByte, err := bj.Meta.Encode() + if err != nil { + return "", errors.Trace(err) + } + + if i == 0 { + sql = sqlPrefix + fmt.Sprintf("(%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", + bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, + bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + continue + } + sql += fmt.Sprintf(", (%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", + bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, + bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + } + return sql, nil } // AddBackfillHistoryJob adds the backfill jobs to the tidb_ddl_backfill_history table. func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error { - return addBackfillJobs(sess, BackfillHistoryTable, backfillJobs) + label := fmt.Sprintf("add_%s_job", BackfillHistoryTable) + sql, err := generateInsertBackfillJobSQL(BackfillHistoryTable, backfillJobs) + if err != nil { + return err + } + _, err = sess.execute(context.Background(), sql, label) + return errors.Trace(err) } -// addBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. -func addBackfillJobs(sess *session, tableName string, backfillJobs []*BackfillJob) error { - sqlPrefix := fmt.Sprintf( - "insert into mysql.%s(id, ddl_job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values", tableName) - var sql string - label := fmt.Sprintf("add_%s_job", tableName) +// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. +func AddBackfillJobs(sess *session, backfillJobs []*BackfillJob) error { + label := fmt.Sprintf("add_%s_job", BackfillTable) // Do runInTxn to get StartTS. return runInTxn(newSession(sess), func(se *session) error { txn, err := se.txn() if err != nil { return errors.Trace(err) } - startTS := txn.StartTS() - for i, bj := range backfillJobs { - if tableName == BackfillTable { - bj.StartTS = startTS - } - if tableName == BackfillHistoryTable { - bj.FinishTS = startTS - } - mateByte, err := bj.Meta.Encode() - if err != nil { - return errors.Trace(err) - } + for _, bj := range backfillJobs { + bj.StartTS = startTS + } - if i == 0 { - sql = sqlPrefix + fmt.Sprintf("(%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", - bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, - bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) - continue - } - sql += fmt.Sprintf(", (%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", - bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, - bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + sql, err := generateInsertBackfillJobSQL(BackfillTable, backfillJobs) + if err != nil { + return err } _, err = sess.execute(context.Background(), sql, label) return errors.Trace(err) diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 5a9df95d24759..ca4d240f161e0 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -261,7 +261,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.NoError(t, err) require.Nil(t, bJobs) bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) - require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job, lease is timeout").Error()) + require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error()) require.Nil(t, bJobs) allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", jobID1, eleID2, meta.IndexElementKey), "check_backfill_job_count") @@ -374,7 +374,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs, err = ddl.GetBackfillJobs(se, ddl.BackfillHistoryTable, condition, "test_get_bj") require.NoError(t, err) require.Len(t, bJobs, 1) - require.Greater(t, bJobs[0].FinishTS, uint64(0)) + require.Equal(t, bJobs[0].FinishTS, uint64(0)) // test GetMaxBackfillJob and GetInterruptedBackfillJobsForOneEle bjob, err := ddl.GetMaxBackfillJob(se, bJobs3[0].JobID, bJobs3[0].EleID, eleKey) From b661cf7060a1582ef9b3c556cd033fe488d7fd6b Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 4 Jan 2023 14:01:54 +0800 Subject: [PATCH 8/8] Update ddl/backfilling.go Co-authored-by: tangenta --- ddl/backfilling.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 5e9bbd069173d..a7c23a545208e 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -416,7 +416,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, batchStartTime = time.Now() if err := w.updateLease(w.GetCtx().uuid, task.bfJob, result.nextKey); err != nil { logutil.BgLogger().Info("[ddl] backfill worker handle task, update lease failed", zap.Stringer("worker", w), - zap.Stringer("task", task), zap.String("bj", task.bfJob.AbbrStr()), zap.Error(err)) + zap.Stringer("task", task), zap.String("backfill job", task.bfJob.AbbrStr()), zap.Error(err)) result.err = err return result }