From a22d690fdab47b2b1abb3557d337a43dc7dc2b38 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 9 Feb 2023 23:54:08 +0800 Subject: [PATCH] ddl, model: support for dist-reorg on partitioned tables (#41145) close pingcap/tidb#41144 --- ddl/backfilling.go | 140 +++++++++-------- ddl/constant.go | 4 +- ddl/ddl_test.go | 3 + ddl/dist_backfilling.go | 45 ++---- ddl/dist_owner.go | 339 ++++++++++++++++++++++++++++++++-------- ddl/export_test.go | 10 +- ddl/index.go | 67 ++++---- ddl/index_cop_test.go | 3 +- ddl/job_table.go | 103 ++++++++---- ddl/job_table_test.go | 244 ++++++++++++++++++++++++----- metrics/metrics.go | 18 +-- parser/model/ddl.go | 7 +- sessionctx/BUILD.bazel | 2 +- 13 files changed, 715 insertions(+), 270 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 6abaad9396e6c..c6d19fcc0827e 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/store/copr" "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -61,12 +62,14 @@ const ( typeAddIndexMergeTmpWorker backfillerType = 3 // InstanceLease is the instance lease. - InstanceLease = 1 * time.Minute - updateInstanceLease = 25 * time.Second - genTaskBatch = 4096 - minGenTaskBatch = 1024 - minDistTaskCnt = 32 - retrySQLTimes = 10 + InstanceLease = 1 * time.Minute + updateInstanceLease = 25 * time.Second + genTaskBatch = 4096 + genPhysicalTableTaskBatch = 256 + minGenTaskBatch = 1024 + minGenPhysicalTableTaskBatch = 64 + minDistTaskCnt = 64 + retrySQLTimes = 10 ) // RetrySQLInterval is export for test. @@ -89,15 +92,15 @@ func (bT backfillerType) String() string { // BackfillJob is for a tidb_ddl_backfill table's record. type BackfillJob struct { - ID int64 - JobID int64 - EleID int64 - EleKey []byte - Tp backfillerType - State model.JobState - StoreID int64 - InstanceID string - InstanceLease types.Time + ID int64 + JobID int64 + EleID int64 + EleKey []byte + PhysicalTableID int64 + Tp backfillerType + State model.JobState + InstanceID string + InstanceLease types.Time // range info CurrKey []byte StartKey []byte @@ -252,14 +255,13 @@ type reorgBackfillTask struct { physicalTable table.PhysicalTable // TODO: Remove the following fields after remove the function of run. - id int - physicalTableID int64 - startKey kv.Key - endKey kv.Key - endInclude bool - jobID int64 - sqlQuery string - priority int + id int + startKey kv.Key + endKey kv.Key + endInclude bool + jobID int64 + sqlQuery string + priority int } func (r *reorgBackfillTask) getJobID() int64 { @@ -278,7 +280,7 @@ func (r *reorgBackfillTask) excludedEndKey() kv.Key { } func (r *reorgBackfillTask) String() string { - physicalID := strconv.FormatInt(r.physicalTableID, 10) + physicalID := strconv.FormatInt(r.physicalTable.GetPhysicalID(), 10) startKey := hex.EncodeToString(r.startKey) endKey := hex.EncodeToString(r.endKey) rangeStr := "taskID_" + strconv.Itoa(r.id) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey @@ -366,6 +368,9 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, rc := d.getReorgCtx(jobID) isDistReorg := task.bfJob != nil + if isDistReorg { + w.initPartitionIndexInfo(task) + } for { // Give job chance to be canceled, if we not check it here, // if there is panic in bf.BackfillDataInTxn we will never cancel the job. @@ -437,6 +442,15 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, return result } +func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) { + if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok { + if addIdxWorker, ok := w.backfiller.(*addIndexWorker); ok { + indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID) + addIdxWorker.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo) + } + } +} + func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResult) { logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w), zap.String("task", task.String())) defer util.Recover(metrics.LabelDDL, "backfillWorker.runTask", func() { @@ -668,7 +682,6 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask { batchTasks := make([]*reorgBackfillTask, 0, batch) - physicalTableID := reorgInfo.PhysicalTableID var prefix kv.Key if reorgInfo.mergingTmpIdx { prefix = t.IndexPrefix() @@ -679,7 +692,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, job := reorgInfo.Job //nolint:forcetypeassert phyTbl := t.(table.PhysicalTable) - jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID) + jobCtx := reorgInfo.d.jobContext(job.ID) for i, keyRange := range kvRanges { startKey := keyRange.StartKey endKey := keyRange.EndKey @@ -687,7 +700,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, if err != nil { logutil.BgLogger().Info("[ddl] get backfill range task, get reverse key failed", zap.Error(err)) } else { - logutil.BgLogger().Info("[ddl] get backfill range task, change end key", + logutil.BgLogger().Info("[ddl] get backfill range task, change end key", zap.Int64("pTbl", phyTbl.GetPhysicalID()), zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK))) endKey = endK } @@ -699,13 +712,12 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, } task := &reorgBackfillTask{ - id: i, - jobID: reorgInfo.Job.ID, - physicalTableID: physicalTableID, - physicalTable: phyTbl, - priority: reorgInfo.Priority, - startKey: startKey, - endKey: endKey, + id: i, + jobID: job.ID, + physicalTable: phyTbl, + priority: reorgInfo.Priority, + startKey: startKey, + endKey: endKey, // If the boundaries overlap, we should ignore the preceding endKey. endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1} batchTasks = append(batchTasks, task) @@ -1108,8 +1120,8 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { return nil } -func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool, - batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error { +func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, phyTblID int64, notDistTask bool, + batchTasks []*reorgBackfillTask, bJobs []*BackfillJob) error { bJobs = bJobs[:0] instanceID := "" if notDistTask { @@ -1119,12 +1131,11 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo // TODO: Adjust the number of ranges(region) for each task. for _, task := range batchTasks { bm := &model.BackfillMeta{ - PhysicalTableID: reorgInfo.PhysicalTableID, - IsUnique: isUnique, - EndInclude: task.endInclude, - ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, - SQLMode: reorgInfo.ReorgMeta.SQLMode, - Location: reorgInfo.ReorgMeta.Location, + IsUnique: sJobCtx.isUnique, + EndInclude: task.endInclude, + ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, + SQLMode: reorgInfo.ReorgMeta.SQLMode, + Location: reorgInfo.ReorgMeta.Location, JobMeta: &model.JobMeta{ SchemaID: reorgInfo.Job.SchemaID, TableID: reorgInfo.Job.TableID, @@ -1133,19 +1144,19 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo }, } bj := &BackfillJob{ - ID: *id, - JobID: reorgInfo.Job.ID, - EleID: reorgInfo.currElement.ID, - EleKey: reorgInfo.currElement.TypeKey, - Tp: bfWorkerType, - State: model.JobStateNone, - InstanceID: instanceID, - CurrKey: task.startKey, - StartKey: task.startKey, - EndKey: task.endKey, - Meta: bm, + ID: sJobCtx.currBackfillJobID.Add(1), + JobID: reorgInfo.Job.ID, + EleID: reorgInfo.currElement.ID, + EleKey: reorgInfo.currElement.TypeKey, + PhysicalTableID: phyTblID, + Tp: sJobCtx.bfWorkerType, + State: model.JobStateNone, + InstanceID: instanceID, + CurrKey: task.startKey, + StartKey: task.startKey, + EndKey: task.endKey, + Meta: bm, } - *id++ bJobs = append(bJobs, bj) } if err := AddBackfillJobs(sess, bJobs); err != nil { @@ -1154,22 +1165,22 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo return nil } -func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool, - bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error { - endKey := reorgInfo.EndKey - isFirstOps := true - bJobs := make([]*BackfillJob, 0, genTaskBatch) +func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, pTblMeta *BackfillJobRangeMeta) error { + isFirstOps := !sJobCtx.isMultiPhyTbl + batchSize := sJobCtx.batchSize + startKey, endKey := pTblMeta.StartKey, pTblMeta.EndKey + bJobs := make([]*BackfillJob, 0, batchSize) for { - kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey, genTaskBatch) + kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize) if err != nil { return errors.Trace(err) } - batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch) + batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize) if len(batchTasks) == 0 { break } notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) - if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil { + if err = addBatchBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta.PhyTblID, notNeedDistProcess, batchTasks, bJobs); err != nil { return errors.Trace(err) } isFirstOps = false @@ -1177,6 +1188,7 @@ func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, remains := kvRanges[len(batchTasks):] dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job") logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", + zap.Int64("physicalID", pTblMeta.PhyTblID), zap.Int("batchTasksCnt", len(batchTasks)), zap.Int("totalRegionCnt", len(kvRanges)), zap.Int("remainRegionCnt", len(remains)), @@ -1188,11 +1200,11 @@ func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, } for { - bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, pTblMeta.PhyTblID) if err != nil { return errors.Trace(err) } - if bJobCnt < minGenTaskBatch { + if bJobCnt < sJobCtx.minBatchSize { break } time.Sleep(RetrySQLInterval) diff --git a/ddl/constant.go b/ddl/constant.go index f9de82e2e6dad..8e276e7205e56 100644 --- a/ddl/constant.go +++ b/ddl/constant.go @@ -55,7 +55,7 @@ const ( ddl_job_id bigint not null, ele_id bigint not null, ele_key blob, - store_id bigint, + ddl_physical_tid bigint, type int, exec_id blob default null, exec_lease timestamp, @@ -74,7 +74,7 @@ const ( ddl_job_id bigint not null, ele_id bigint not null, ele_key blob, - store_id bigint, + ddl_physical_tid bigint, type int, exec_id blob default null, exec_lease timestamp, diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index dc19436afd9b9..dd0505b58e4db 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -58,6 +58,9 @@ var JobNeedGCForTest = jobNeedGC // NewSession is only used for test. var NewSession = newSession +// GetJobWithoutPartition is only used for test. +const GetJobWithoutPartition = getJobWithoutPartition + // GetDDLCtx returns ddlCtx for test. func GetDDLCtx(d DDL) *ddlCtx { return d.(*ddl).ddlCtx diff --git a/ddl/dist_backfilling.go b/ddl/dist_backfilling.go index 53d1241444209..59f9b17506543 100644 --- a/ddl/dist_backfilling.go +++ b/ddl/dist_backfilling.go @@ -15,7 +15,6 @@ package ddl import ( - "encoding/hex" "sync" "time" @@ -35,6 +34,8 @@ import ( "go.uber.org/zap" ) +const getJobWithoutPartition = -1 + type backfillWorkerContext struct { currID int mu sync.Mutex @@ -108,19 +109,12 @@ func (bwCtx *backfillWorkerContext) GetContext() *backfillWorker { return bw } -func runBackfillJobs(d *ddl, ingestBackendCtx *ingest.BackendContext, bJob *BackfillJob, jobCtx *JobContext) (table.Table, error) { +func runBackfillJobs(d *ddl, sess *session, ingestBackendCtx *ingest.BackendContext, bJob *BackfillJob, jobCtx *JobContext) (table.Table, error) { dbInfo, tbl, err := d.getTableByTxn(d.store, bJob.Meta.SchemaID, bJob.Meta.TableID) if err != nil { logutil.BgLogger().Warn("[ddl] runBackfillJobs gets table failed", zap.String("bfJob", bJob.AbbrStr()), zap.Error(err)) return nil, err } - se, err := d.sessPool.get() - if err != nil { - logutil.BgLogger().Warn("[ddl] run backfill jobs get session failed", zap.Error(err)) - return nil, err - } - defer d.sessPool.put(se) - sess := newSession(se) workerCnt := int(variable.GetDDLReorgWorkerCounter()) // TODO: Different worker using different newBackfillerFunc. @@ -135,9 +129,14 @@ func runBackfillJobs(d *ddl, ingestBackendCtx *ingest.BackendContext, bJob *Back return bfWorker.runTask(task) }) + runningPID := int64(0) + // If txn-merge we needn't to claim the backfill job through the partition table + if ingestBackendCtx == nil { + runningPID = getJobWithoutPartition + } proFunc := func() ([]*reorgBackfillTask, error) { // TODO: After BackfillJob replaces reorgBackfillTask, use backfiller's GetTasks instead of it. - return GetTasks(d.ddlCtx, sess, tbl, bJob.JobID, workerCnt+5) + return GetTasks(d.ddlCtx, sess, tbl, bJob.JobID, &runningPID, workerCnt+5) } // add new task resultCh, control := d.backfillWorkerPool.AddProduceBySlice(proFunc, 0, workerCtx, spmc.WithConcurrency(workerCnt)) @@ -220,41 +219,28 @@ func (bwm *backfilWorkerManager) close(d *ddl) error { func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBackfillTask, error) { pt := t.(table.PhysicalTable) if tbl, ok := t.(table.PartitionedTable); ok { - pt = tbl.GetPartition(bfJob.Meta.PhysicalTableID) + pt = tbl.GetPartition(bfJob.PhysicalTableID) if pt == nil { - return nil, dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", bfJob.Meta.PhysicalTableID, t.Meta().ID) + return nil, dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", bfJob.PhysicalTableID, t.Meta().ID) } } - endKey := bfJob.EndKey - // TODO: Check reorgInfo.mergingTmpIdx - endK, err := getRangeEndKey(dc.jobContext(bfJob.JobID), dc.store, bfJob.Meta.Priority, pt.RecordPrefix(), bfJob.StartKey, endKey) - if err != nil { - logutil.BgLogger().Info("[ddl] convert backfill job to task, get reverse key failed", zap.String("backfill job", bfJob.AbbrStr()), zap.Error(err)) - } else { - logutil.BgLogger().Info("[ddl] convert backfill job to task, change end key", zap.String("backfill job", - bfJob.AbbrStr()), zap.String("current key", hex.EncodeToString(bfJob.StartKey)), zap.Bool("end include", bfJob.Meta.EndInclude), - zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK))) - endKey = endK - } - return &reorgBackfillTask{ bfJob: bfJob, physicalTable: pt, // TODO: Remove these fields after remove the old logic. sqlQuery: bfJob.Meta.Query, startKey: bfJob.StartKey, - endKey: endKey, + endKey: bfJob.EndKey, endInclude: bfJob.Meta.EndInclude, priority: bfJob.Meta.Priority}, nil } // GetTasks gets the backfill tasks associated with the non-runningJobID. -func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, concurrency int) ([]*reorgBackfillTask, error) { +func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, runningPID *int64, concurrency int) ([]*reorgBackfillTask, error) { // TODO: At present, only add index is processed. In the future, different elements need to be distinguished. var err error - var bJobs []*BackfillJob for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetAndMarkBackfillJobsForOneEle(sess, concurrency, runningJobID, d.uuid, InstanceLease) + bJobs, err := GetAndMarkBackfillJobsForOneEle(sess, concurrency, runningJobID, d.uuid, *runningPID, InstanceLease) if err != nil { // TODO: add test: if all tidbs can't get the unmark backfill job(a tidb mark a backfill job, other tidbs returned, then the tidb can't handle this job.) if dbterror.ErrDDLJobNotFound.Equal(err) { @@ -268,6 +254,9 @@ func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, con } } + if *runningPID != getJobWithoutPartition { + *runningPID = bJobs[0].PhysicalTableID + } tasks := make([]*reorgBackfillTask, 0, len(bJobs)) for _, bJ := range bJobs { task, err := d.backfillJob2Task(tbl, bJ) diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index d10491fd0da3e..94337a02ad809 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -16,84 +16,300 @@ package ddl import ( "context" + "encoding/hex" "fmt" + "strconv" "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" + tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" + atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) // CheckBackfillJobFinishInterval is export for test. var CheckBackfillJobFinishInterval = 300 * time.Millisecond -func initDistReorg(reorgMeta *model.DDLReorgMeta, store kv.Storage, schemaID int64, tblInfo *model.TableInfo) error { - tbl, err := getTable(store, schemaID, tblInfo) +const ( + distPhysicalTableConcurrency = 16 +) + +func initDistReorg(reorgMeta *model.DDLReorgMeta) { + isDistReorg := variable.DDLEnableDistributeReorg.Load() + reorgMeta.IsDistReorg = isDistReorg +} + +// BackfillJobRangeMeta is export for test. +type BackfillJobRangeMeta struct { + ID int64 + PhyTblID int64 + PhyTbl table.PhysicalTable + StartKey []byte + EndKey []byte +} + +func (m *BackfillJobRangeMeta) String() string { + physicalID := strconv.FormatInt(m.PhyTblID, 10) + startKey := hex.EncodeToString(m.StartKey) + endKey := hex.EncodeToString(m.EndKey) + rangeStr := "taskID_" + strconv.Itoa(int(m.ID)) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey + ")" + return rangeStr +} + +type splitJobContext struct { + ctx context.Context + cancel context.CancelFunc + isMultiPhyTbl bool + bfWorkerType backfillerType + isUnique bool + batchSize int + minBatchSize int + currBackfillJobID *atomicutil.Int64 + currPhysicalID int64 + phyTblMetaCh chan *BackfillJobRangeMeta + resultCh chan error +} + +func getRunningPhysicalTableMetas(sess *session, sJobCtx *splitJobContext, reorgInfo *reorgInfo) ([]*BackfillJobRangeMeta, error) { + ddlJobID, eleID, eleKey, currPID := reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, reorgInfo.PhysicalTableID + pTblMetas, err := GetPhysicalTableMetas(sess, ddlJobID, eleID, eleKey) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } - isDistReorg := variable.DDLEnableDistributeReorg.Load() - // TODO: Support partitionTable. - if _, ok := tbl.(table.PartitionedTable); ok { - isDistReorg = false + currBfJobID := int64(1) + physicalTIDs := make([]int64, 0, len(pTblMetas)) + phyTblMetas := make([]*BackfillJobRangeMeta, 0, len(pTblMetas)) + if len(pTblMetas) == 0 { + bfJM := &BackfillJobRangeMeta{PhyTblID: currPID, StartKey: reorgInfo.StartKey, EndKey: reorgInfo.EndKey} + phyTblMetas = append(phyTblMetas, bfJM) + physicalTIDs = append(physicalTIDs, bfJM.PhyTblID) + } else { + for _, pMeta := range pTblMetas { + phyTblMetas = append(phyTblMetas, pMeta) + currPID = mathutil.Max(pMeta.PhyTblID, currPID) + currBfJobID = mathutil.Max(pMeta.ID, currBfJobID) + physicalTIDs = append(physicalTIDs, pMeta.PhyTblID) + } } - reorgMeta.IsDistReorg = isDistReorg - return nil + sJobCtx.currPhysicalID = currPID + sJobCtx.currBackfillJobID = atomicutil.NewInt64(currBfJobID) + logutil.BgLogger().Info("[ddl] unprocessed physical table ranges get from table", zap.Int64("jobID", ddlJobID), + zap.Int64("eleID", eleID), zap.ByteString("eleKey", eleKey), + zap.Int64("currPID", sJobCtx.currPhysicalID), zap.Int64s("phyTblIDs", physicalTIDs)) + return phyTblMetas, nil } -func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { +func (dc *ddlCtx) sendPhysicalTableMetas(reorgInfo *reorgInfo, t table.Table, sJobCtx *splitJobContext, runningPTblMetas []*BackfillJobRangeMeta) { + var err error + physicalTIDs := make([]int64, 0, distPhysicalTableConcurrency) + defer func() { + logutil.BgLogger().Info("[ddl] send physical table ranges to split finished", zap.Int64("jobID", reorgInfo.Job.ID), + zap.Stringer("ele", reorgInfo.currElement), zap.Int64s("phyTblIDs", physicalTIDs), zap.Error(err)) + if err != nil { + sJobCtx.cancel() + } else { + close(sJobCtx.phyTblMetaCh) + } + }() + + for _, pTblM := range runningPTblMetas { + err = dc.isReorgRunnable(reorgInfo.Job.ID, false) + if err != nil { + return + } + + if tbl, ok := t.(table.PartitionedTable); ok { + pTblM.PhyTbl = tbl.GetPartition(pTblM.PhyTblID) + sJobCtx.phyTblMetaCh <- pTblM + } else { + //nolint:forcetypeassert + phyTbl := t.(table.PhysicalTable) + pTblM.PhyTbl = phyTbl + sJobCtx.phyTblMetaCh <- pTblM + } + physicalTIDs = append(physicalTIDs, pTblM.PhyTblID) + } + + if tbl, ok := t.(table.PartitionedTable); ok { + currPhysicalID := sJobCtx.currPhysicalID + for { + err = dc.isReorgRunnable(reorgInfo.Job.ID, false) + if err != nil { + return + } + select { + case <-sJobCtx.ctx.Done(): + err = sJobCtx.ctx.Err() + return + default: + } + + pID, startKey, endKey, err1 := getNextPartitionInfo(reorgInfo, tbl, currPhysicalID) + if err1 != nil { + err = err1 + return + } + if pID == 0 { + // Next partition does not exist, all the job done. + return + } + pTbl := tbl.GetPartition(pID) + if pTbl == nil { + err = dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", pID, t.Meta().ID) + return + } + bfJM := &BackfillJobRangeMeta{PhyTblID: pID, PhyTbl: pTbl, StartKey: startKey, EndKey: endKey} + sJobCtx.phyTblMetaCh <- bfJM + currPhysicalID = pID + + physicalTIDs = append(physicalTIDs, pID) + } + } +} + +func (dc *ddlCtx) controlWriteTableRecord(sessPool *sessionPool, t table.Table, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey if startKey == nil && endKey == nil { return nil } ddlJobID := reorgInfo.Job.ID - if err := dc.isReorgRunnable(ddlJobID, true); err != nil { - return errors.Trace(err) - } - currEle := reorgInfo.currElement - defaultSQLMode := sess.GetSessionVars().SQLMode - defer func() { - sess.GetSessionVars().SQLMode = defaultSQLMode - }() - // Make timestamp type can be inserted ZeroTimestamp. - sess.GetSessionVars().SQLMode = mysql.ModeNone - currBackfillJobID := int64(1) - err := checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEle.ID, currEle.TypeKey) + logutil.BgLogger().Info("[ddl] control write table record start", + zap.Int64("jobID", ddlJobID), zap.Stringer("ele", currEle), + zap.Int64("tblID", t.Meta().ID), zap.Int64("currPID", reorgInfo.PhysicalTableID)) + sCtx, err := sessPool.get() if err != nil { return errors.Trace(err) } - maxBfJob, err := GetMaxBackfillJob(sess, ddlJobID, currEle.ID, currEle.TypeKey) - if err != nil { + defer sessPool.put(sCtx) + sess := newSession(sCtx) + + if err := dc.isReorgRunnable(ddlJobID, true); err != nil { return errors.Trace(err) } - if maxBfJob != nil { - startKey = maxBfJob.EndKey - currBackfillJobID = maxBfJob.ID + 1 - } - var isUnique bool if bfWorkerType == typeAddIndexWorker { idxInfo := model.FindIndexInfoByID(t.Meta().Indices, currEle.ID) isUnique = idxInfo.Unique } - err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID) + + wg := tidbutil.WaitGroupWrapper{} + sJobCtx := &splitJobContext{ + bfWorkerType: bfWorkerType, + isUnique: isUnique, + batchSize: genTaskBatch, + minBatchSize: minGenTaskBatch, + phyTblMetaCh: make(chan *BackfillJobRangeMeta, 1), + resultCh: make(chan error, distPhysicalTableConcurrency), + } + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) + sJobCtx.ctx, sJobCtx.cancel = context.WithCancel(ctx) + concurrency := 1 + if tbl, ok := t.(table.PartitionedTable); ok { + ids := len(tbl.GetAllPartitionIDs()) + if ids > 1 { + sJobCtx.isMultiPhyTbl = true + concurrency = ids + } + if ids > distPhysicalTableConcurrency { + concurrency = distPhysicalTableConcurrency + } + sJobCtx.batchSize = genPhysicalTableTaskBatch + sJobCtx.minBatchSize = minGenPhysicalTableTaskBatch + } + + err = checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEle.ID, currEle.TypeKey) if err != nil { return errors.Trace(err) } + phyTblMetas, err := getRunningPhysicalTableMetas(sess, sJobCtx, reorgInfo) + if err != nil { + return err + } + + sCtxs := make([]sessionctx.Context, 0, concurrency) + for i := 0; i < concurrency; i++ { + sCtx, err := sessPool.get() + if err != nil { + return err + } + sCtxs = append(sCtxs, sCtx) + } + + wg.Run(func() { + defer tidbutil.Recover(metrics.LabelDistReorg, "sendPhysicalTableMeta", nil, false) + dc.sendPhysicalTableMetas(reorgInfo, t, sJobCtx, phyTblMetas) + }) + for _, sCtx := range sCtxs { + func(ctx sessionctx.Context) { + wg.Run(func() { + defer func() { + tidbutil.Recover(metrics.LabelDistReorg, "splitTableToBackfillJobs", nil, false) + }() + se := newSession(ctx) + dc.splitPhysicalTableToBackfillJobs(se, reorgInfo, sJobCtx) + }) + }(sCtx) + } + wg.Wait() + for _, sCtx := range sCtxs { + sessPool.put(sCtx) + } return checkReorgJobFinished(dc.ctx, sess, &dc.reorgCtx, ddlJobID, currEle) } +func (dc *ddlCtx) splitPhysicalTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext) { + defaultSQLMode := sess.GetSessionVars().SQLMode + defer func() { sess.GetSessionVars().SQLMode = defaultSQLMode }() + // Make timestamp type can be inserted ZeroTimestamp. + sess.GetSessionVars().SQLMode = mysql.ModeNone + + var err error + var pTblMetaCnt int + var pTblMeta *BackfillJobRangeMeta + defer func() { + if err != nil { + sJobCtx.cancel() + } + logutil.BgLogger().Info("[ddl] split backfill jobs to table finish", zap.Int64("jobID", reorgInfo.Job.ID), + zap.Stringer("ele", reorgInfo.currElement), zap.Int("donePTbls", pTblMetaCnt), zap.Stringer("physical_tbl", pTblMeta), zap.Error(err)) + }() + + var ok bool + for { + select { + case <-sJobCtx.ctx.Done(): + err = sJobCtx.ctx.Err() + case pTblMeta, ok = <-sJobCtx.phyTblMetaCh: + if !ok { + return + } + if err = dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { + return + } + + err = dc.splitTableToBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta) + if err != nil { + return + } + pTblMetaCnt++ + } + } +} + func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgContexts, ddlJobID int64, currEle *meta.Element) error { var times int64 var bfJob *BackfillJob @@ -117,29 +333,29 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC if !backfillJobFinished { err := checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEle.ID, currEle.TypeKey) if err != nil { - logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", ddlJobID), zap.Error(err)) + logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err)) return errors.Trace(err) } - bfJob, err = getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEle.ID, currEle.TypeKey, false) + bfJobs, err := getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEle.ID, currEle.TypeKey) if err != nil { - logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Error(err)) + logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err)) return errors.Trace(err) } - if bfJob == nil { + if len(bfJobs) == 0 { backfillJobFinished = true - logutil.BgLogger().Info("[ddl] finish all backfill jobs", zap.Int64("job ID", ddlJobID)) + logutil.BgLogger().Info("[ddl] finish all backfill jobs", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle)) } } if backfillJobFinished { // TODO: Consider whether these backfill jobs are always out of sync. isSynced, err := checkJobIsFinished(sess, ddlJobID) if err != nil { - logutil.BgLogger().Warn("[ddl] checkJobIsFinished failed", zap.Int64("job ID", ddlJobID), zap.Error(err)) + logutil.BgLogger().Warn("[ddl] checkJobIsFinished failed", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err)) return errors.Trace(err) } if isSynced { - logutil.BgLogger().Info("[ddl] finish all backfill jobs and put them to history", zap.Int64("job ID", ddlJobID)) + logutil.BgLogger().Info("[ddl] finish all backfill jobs and put them to history", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle)) return GetBackfillErr(sess, ddlJobID, currEle.ID, currEle.TypeKey) } } @@ -217,14 +433,15 @@ func checkAndHandleInterruptedBackfillJobs(sess *session, ddlJobID, currEleID in return errors.Trace(err) } -func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { +func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte, pTblID int64) (backfillJobCnt int, err error) { err = checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEleID, currEleKey) if err != nil { return 0, errors.Trace(err) } - backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", - ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_count") + backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, + fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and ddl_physical_tid = %d", + ddlJobID, currEleID, wrapKey2String(currEleKey), pTblID), "check_backfill_job_count") if err != nil { return 0, errors.Trace(err) } @@ -232,50 +449,46 @@ func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey return backfillJobCnt, nil } -func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) { +func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleID int64, currEleKey []byte) ([]*BackfillJob, error) { var err error var bJobs []*BackfillJob - descStr := "" - if isDesc { - descStr = "order by id desc" - } for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s %s limit 1", - ddlJobID, currEleID, wrapKey2String(currEleKey), descStr), "check_backfill_job_state") + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s limit 1", + ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_state") if err != nil { logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) + time.Sleep(RetrySQLInterval) continue } - if len(bJobs) != 0 { - return bJobs[0], nil - } - break + return bJobs, nil } return nil, errors.Trace(err) } -// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. -func GetMaxBackfillJob(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { - bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEleID, currEleKey, true) +// GetPhysicalTableMetas gets the max backfill metas per physical table in BackfillTable and BackfillHistoryTable. +func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (map[int64]*BackfillJobRangeMeta, error) { + condition := fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", ddlJobID, currEleID, wrapKey2String(currEleKey)) + pTblMs, err := GetBackfillIDAndMetas(sess, BackfillTable, condition, "get_ptbl_metas") if err != nil { return nil, errors.Trace(err) } - hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, ddlJobID, currEleID, currEleKey, true) + hPTblMs, err := GetBackfillIDAndMetas(sess, BackfillHistoryTable, condition, "get_ptbl_metas") if err != nil { return nil, errors.Trace(err) } - if bfJob == nil { - return hJob, nil + metaMap := make(map[int64]*BackfillJobRangeMeta, len(pTblMs)+len(hPTblMs)) + for _, m := range pTblMs { + metaMap[m.PhyTblID] = m } - if hJob == nil { - return bfJob, nil - } - if bfJob.ID > hJob.ID { - return bfJob, nil + for _, m := range hPTblMs { + val, ok := metaMap[m.PhyTblID] + if !ok || (ok && m.ID > val.ID) { + metaMap[m.PhyTblID] = m + } } - return hJob, nil + return metaMap, nil } // MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. diff --git a/ddl/export_test.go b/ddl/export_test.go index 3ea26fb04290c..d83549609c890 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" ) @@ -28,13 +29,14 @@ func SetBatchInsertDeleteRangeSize(i int) { var NewCopContext4Test = newCopContext -func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, store kv.Storage, +func FetchRowsFromCop4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endKey kv.Key, store kv.Storage, batchSize int) ([]*indexRecord, bool, error) { variable.SetDDLReorgBatchSize(int32(batchSize)) task := &reorgBackfillTask{ - id: 1, - startKey: startKey, - endKey: endKey, + id: 1, + startKey: startKey, + endKey: endKey, + physicalTable: tbl, } pool := newCopReqSenderPool(context.Background(), copCtx, store) pool.adjustSize(1) diff --git a/ddl/index.go b/ddl/index.go index 312d7fe2cf305..4785b12e41090 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -657,9 +657,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.SchemaState = model.StateWriteReorganization if job.MultiSchemaInfo == nil { - if err := initDistReorg(job.ReorgMeta, d.store, schemaID, tblInfo); err != nil { - return ver, errors.Trace(err) - } + initDistReorg(job.ReorgMeta) } case model.StateWriteReorganization: // reorganization -> public @@ -1802,6 +1800,11 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgIn // addTableIndex handles the add index reorganization state for a table. func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { + // TODO: Support typeAddIndexMergeTmpWorker. + if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx { + return w.controlWriteTableRecord(w.sessPool, t, typeAddIndexWorker, reorgInfo) + } + var err error if tbl, ok := t.(table.PartitionedTable); ok { var finish bool @@ -1814,7 +1817,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { if err != nil { break } - finish, err = w.updateReorgInfo(tbl, reorgInfo) + finish, err = updateReorgInfo(w.sessPool, tbl, reorgInfo) if err != nil { return errors.Trace(err) } @@ -1822,38 +1825,26 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { } else { //nolint:forcetypeassert phyTbl := t.(table.PhysicalTable) - // TODO: Support typeAddIndexMergeTmpWorker. - if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx { - sCtx, err := w.sessPool.get() - if err != nil { - return errors.Trace(err) - } - defer w.sessPool.put(sCtx) - return w.controlWritePhysicalTableRecord(newSession(sCtx), phyTbl, typeAddIndexWorker, reorgInfo) - } err = w.addPhysicalTableIndex(phyTbl, reorgInfo) } return errors.Trace(err) } -// updateReorgInfo will find the next partition according to current reorgInfo. -// If no more partitions, or table t is not a partitioned table, returns true to -// indicate that the reorganize work is finished. -func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bool, error) { +func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysicalTableID int64) (int64, kv.Key, kv.Key, error) { pi := t.Meta().GetPartitionInfo() if pi == nil { - return true, nil + return 0, nil, nil, nil } - pid, err := findNextPartitionID(reorg.PhysicalTableID, pi.Definitions) + pid, err := findNextPartitionID(currPhysicalTableID, pi.Definitions) if err != nil { // Fatal error, should not run here. logutil.BgLogger().Error("[ddl] find next partition ID failed", zap.Reflect("table", t), zap.Error(err)) - return false, errors.Trace(err) + return 0, nil, nil, errors.Trace(err) } if pid == 0 { // Next partition does not exist, all the job done. - return true, nil + return 0, nil, nil, nil } failpoint.Inject("mockUpdateCachedSafePoint", func(val failpoint.Value) { @@ -1866,24 +1857,40 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo time.Sleep(time.Second * 3) } }) + + var startKey, endKey kv.Key if reorg.mergingTmpIdx { indexID := reorg.currElement.ID - reorg.StartKey, reorg.EndKey = tablecodec.GetTableIndexKeyRange(pid, tablecodec.TempIndexPrefix|indexID) + startKey, endKey = tablecodec.GetTableIndexKeyRange(pid, tablecodec.TempIndexPrefix|indexID) } else { currentVer, err := getValidCurrentVersion(reorg.d.store) if err != nil { - return false, errors.Trace(err) + return 0, nil, nil, errors.Trace(err) } - start, end, err := getTableRange(reorg.d.jobContext(reorg.Job.ID), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + startKey, endKey, err = getTableRange(reorg.d.jobContext(reorg.Job.ID), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { - return false, errors.Trace(err) + return 0, nil, nil, errors.Trace(err) } - reorg.StartKey, reorg.EndKey = start, end } - reorg.PhysicalTableID = pid + return pid, startKey, endKey, nil +} + +// updateReorgInfo will find the next partition according to current reorgInfo. +// If no more partitions, or table t is not a partitioned table, returns true to +// indicate that the reorganize work is finished. +func updateReorgInfo(sessPool *sessionPool, t table.PartitionedTable, reorg *reorgInfo) (bool, error) { + pid, startKey, endKey, err := getNextPartitionInfo(reorg, t, reorg.PhysicalTableID) + if err != nil { + return false, errors.Trace(err) + } + if pid == 0 { + // Next partition does not exist, all the job done. + return true, nil + } + reorg.PhysicalTableID, reorg.StartKey, reorg.EndKey = pid, startKey, endKey // Write the reorg info to store so the whole reorganize process can recover from panic. - err = reorg.UpdateReorgMeta(reorg.StartKey, w.sessPool) + err = reorg.UpdateReorgMeta(reorg.StartKey, sessPool) logutil.BgLogger().Info("[ddl] job update reorgInfo", zap.Int64("jobID", reorg.Job.ID), zap.Stringer("element", reorg.currElement), @@ -2077,14 +2084,14 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r return false, errors.Trace(err) } -func runBackfillJobsWithLightning(d *ddl, bfJob *BackfillJob, jobCtx *JobContext) error { +func runBackfillJobsWithLightning(d *ddl, sess *session, bfJob *BackfillJob, jobCtx *JobContext) error { bc, err := ingest.LitBackCtxMgr.Register(d.ctx, bfJob.Meta.IsUnique, bfJob.JobID, bfJob.Meta.SQLMode) if err != nil { logutil.BgLogger().Warn("[ddl] lightning register error", zap.Error(err)) return err } - tbl, err := runBackfillJobs(d, bc, bfJob, jobCtx) + tbl, err := runBackfillJobs(d, sess, bc, bfJob, jobCtx) if err != nil { logutil.BgLogger().Warn("[ddl] runBackfillJobs error", zap.Error(err)) ingest.LitBackCtxMgr.Unregister(bfJob.JobID) diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go index 38bced0b6678d..5edc1680b2308 100644 --- a/ddl/index_cop_test.go +++ b/ddl/index_cop_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" "github.com/stretchr/testify/require" @@ -43,7 +44,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { endKey := startKey.PrefixNext() txn, err := store.Begin() require.NoError(t, err) - idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, store, 10) + idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10) require.NoError(t, err) require.False(t, done) require.NoError(t, txn.Rollback()) diff --git a/ddl/job_table.go b/ddl/job_table.go index d6927be673336..f36e5dad2e601 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -373,11 +373,11 @@ func (d *ddl) loadBackfillJobAndRun() { if err != nil { logutil.BgLogger().Fatal("dispatch backfill jobs loop get session failed, it should not happen, please try restart TiDB", zap.Error(err)) } - defer d.sessPool.put(se) sess := newSession(se) runningJobIDs := d.backfillCtxJobIDs() if len(runningJobIDs) >= reorgWorkerCnt { + d.sessPool.put(se) return } @@ -390,20 +390,23 @@ func (d *ddl) loadBackfillJobAndRun() { } else { logutil.BgLogger().Debug("[ddl] get no backfill job in this instance") } + d.sessPool.put(se) return } jobCtx, existent := d.setBackfillCtxJobContext(bfJob.JobID, bfJob.Meta.Query, bfJob.Meta.Type) if existent { logutil.BgLogger().Warn("[ddl] get the type of backfill job is running in this instance", zap.String("backfill job", bfJob.AbbrStr())) + d.sessPool.put(se) return } // TODO: Adjust how the non-owner uses ReorgCtx. d.setReorgCtxForBackfill(bfJob) d.wg.Run(func() { defer func() { + tidbutil.Recover(metrics.LabelDistReorg, "runBackfillJobs", nil, false) d.removeBackfillCtxJobCtx(bfJob.JobID) - tidbutil.Recover(metrics.LabelBackfillWorker, "runBackfillJobs", nil, false) + d.sessPool.put(se) }() if bfJob.Meta.ReorgTp == model.ReorgTypeLitMerge { @@ -413,10 +416,10 @@ func (d *ddl) loadBackfillJobAndRun() { return } logutil.BgLogger().Info("[ddl] run backfill jobs with ingest in this instance", zap.String("bfJob", bfJob.AbbrStr())) - err = runBackfillJobsWithLightning(d, bfJob, jobCtx) + err = runBackfillJobsWithLightning(d, sess, bfJob, jobCtx) } else { logutil.BgLogger().Info("[ddl] run backfill jobs with txn-merge in this instance", zap.String("bfJob", bfJob.AbbrStr())) - _, err = runBackfillJobs(d, nil, bfJob, jobCtx) + _, err = runBackfillJobs(d, sess, nil, bfJob, jobCtx) } if err == nil { @@ -639,8 +642,7 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) sqlBuilder := strings.Builder{} sqlBuilder.WriteString("insert into mysql.") sqlBuilder.WriteString(tableName) - sqlBuilder.WriteString("(id, ddl_job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values") - jobs := "" + sqlBuilder.WriteString("(id, ddl_job_id, ele_id, ele_key, ddl_physical_tid, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values") for i, bj := range backfillJobs { mateByte, err := bj.Meta.Encode() if err != nil { @@ -650,10 +652,9 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) if i != 0 { sqlBuilder.WriteString(", ") } - sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)", - bj.ID, bj.JobID, bj.EleID, wrapKey2String(bj.EleKey), bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey), + sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)", bj.ID, bj.JobID, bj.EleID, + wrapKey2String(bj.EleKey), bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey), wrapKey2String(bj.StartKey), wrapKey2String(bj.EndKey), bj.StartTS, bj.FinishTS, bj.RowCount, wrapKey2String(mateByte))) - jobs += fmt.Sprintf("job:%#v; ", bj.AbbrStr()) } return sqlBuilder.String(), nil } @@ -730,7 +731,7 @@ func GetBackfillJobForOneEle(s *session, excludedJobIDs []int64, lease time.Dura // GetAndMarkBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element, // and update these jobs with instance ID and lease. -func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid string, lease time.Duration) ([]*BackfillJob, error) { +func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid string, pTblID int64, lease time.Duration) ([]*BackfillJob, error) { var validLen int var bJobs []*BackfillJob err := s.runInTxn(func(se *session) error { @@ -740,9 +741,28 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st } leaseStr := currTime.Add(-lease).Format(types.TimeFormat) - bJobs, err = GetBackfillJobs(se, BackfillTable, - fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_key, ele_id limit %d", - leaseStr, jobID, batch), "get_mark_backfill_job") + getJobsSQL := fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_id, ele_key limit %d", + leaseStr, jobID, batch) + if pTblID != getJobWithoutPartition { + if pTblID == 0 { + rows, err := s.execute(context.Background(), + fmt.Sprintf("select ddl_physical_tid from mysql.%s group by ddl_job_id, ele_id, ele_key, ddl_physical_tid having max(length(exec_id)) = 0 or max(exec_lease) < '%s' order by ddl_job_id, ele_key, ele_id, ddl_physical_tid limit 1", + BackfillTable, leaseStr), "get_mark_backfill_job") + if err != nil { + return errors.Trace(err) + } + + if len(rows) == 0 { + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") + } + + pTblID = rows[0].GetInt64(0) + } + getJobsSQL = fmt.Sprintf("(exec_ID = '' or exec_lease < '%s') and ddl_job_id = %d and ddl_physical_tid = %d order by ddl_job_id, ele_key, ele_id limit %d", + leaseStr, jobID, pTblID, batch) + } + + bJobs, err = GetBackfillJobs(se, BackfillTable, getJobsSQL, "get_mark_backfill_job") if err != nil { return err } @@ -820,6 +840,33 @@ func GetBackfillMetas(sess *session, tblName, condition string, label string) ([ return metas, nil } +// GetBackfillIDAndMetas gets the backfill IDs and metas in the tblName table according to condition. +func GetBackfillIDAndMetas(sess *session, tblName, condition string, label string) ([]*BackfillJobRangeMeta, error) { + sql := "select tbl.id, tbl.curr_key, tbl.end_key, tbl.ddl_physical_tid from (select max(id) max_id, ddl_physical_tid " + + fmt.Sprintf(" from mysql.%s tbl where %s group by ddl_physical_tid) tmp join mysql.%s tbl", + tblName, condition, tblName) + " on tbl.id=tmp.max_id and tbl.ddl_physical_tid=tmp.ddl_physical_tid;" + rows, err := sess.execute(context.Background(), sql, label) + if err != nil { + return nil, errors.Trace(err) + } + if len(rows) == 0 { + return nil, nil + } + + pTblMetas := make([]*BackfillJobRangeMeta, 0, len(rows)) + for _, r := range rows { + pTblMeta := BackfillJobRangeMeta{ + ID: r.GetInt64(0), + StartKey: r.GetBytes(1), + EndKey: r.GetBytes(2), + PhyTblID: r.GetInt64(3), + } + pTblMetas = append(pTblMetas, &pTblMeta) + } + + return pTblMetas, nil +} + func getUnsyncedInstanceIDs(sess *session, jobID int64, label string) ([]string, error) { sql := fmt.Sprintf("select sum((state=%d) + (state=%d)) as tmp, exec_id from mysql.tidb_ddl_backfill_history where ddl_job_id = %d group by exec_id having tmp = 0;", model.JobStateSynced, model.JobStateCancelled, jobID) @@ -844,21 +891,21 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] bJobs := make([]*BackfillJob, 0, len(rows)) for _, row := range rows { bfJob := BackfillJob{ - ID: row.GetInt64(0), - JobID: row.GetInt64(1), - EleID: row.GetInt64(2), - EleKey: row.GetBytes(3), - StoreID: row.GetInt64(4), - Tp: backfillerType(row.GetInt64(5)), - InstanceID: row.GetString(6), - InstanceLease: row.GetTime(7), - State: model.JobState(row.GetInt64(8)), - CurrKey: row.GetBytes(9), - StartKey: row.GetBytes(10), - EndKey: row.GetBytes(11), - StartTS: row.GetUint64(12), - FinishTS: row.GetUint64(13), - RowCount: row.GetInt64(14), + ID: row.GetInt64(0), + JobID: row.GetInt64(1), + EleID: row.GetInt64(2), + EleKey: row.GetBytes(3), + PhysicalTableID: row.GetInt64(4), + Tp: backfillerType(row.GetInt64(5)), + InstanceID: row.GetString(6), + InstanceLease: row.GetTime(7), + State: model.JobState(row.GetInt64(8)), + CurrKey: row.GetBytes(9), + StartKey: row.GetBytes(10), + EndKey: row.GetBytes(11), + StartTS: row.GetUint64(12), + FinishTS: row.GetUint64(13), + RowCount: row.GetInt64(14), } bfJob.Meta = &model.BackfillMeta{} err = bfJob.Meta.Decode(row.GetBytes(15)) diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 38515f71f07d2..ae04c0eb6dd85 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -200,16 +200,17 @@ func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query }, } bj := &ddl.BackfillJob{ - ID: int64(i), - JobID: jobID, - EleID: eleID, - EleKey: meta.IndexElementKey, - State: model.JobStateNone, - InstanceLease: types.ZeroTimestamp, - CurrKey: sKey, - StartKey: sKey, - EndKey: eKey, - Meta: bm, + ID: int64(i), + JobID: jobID, + EleID: eleID, + EleKey: meta.IndexElementKey, + State: model.JobStateNone, + PhysicalTableID: 1, + InstanceLease: types.ZeroTimestamp, + CurrKey: sKey, + StartKey: sKey, + EndKey: eKey, + Meta: bm, } bJobs = append(bJobs, bj) } @@ -221,7 +222,7 @@ func equalBackfillJob(t *testing.T, a, b *ddl.BackfillJob, lessTime types.Time) require.Equal(t, a.JobID, b.JobID) require.Equal(t, a.EleID, b.EleID) require.Equal(t, a.EleKey, b.EleKey) - require.Equal(t, a.StoreID, b.StoreID) + require.Equal(t, a.PhysicalTableID, b.PhysicalTableID) require.Equal(t, a.InstanceID, b.InstanceID) require.GreaterOrEqual(t, b.InstanceLease.Compare(lessTime), 0) require.Equal(t, a.State, b.State) @@ -243,6 +244,18 @@ func readInTxn(se sessionctx.Context, f func(sessionctx.Context)) (err error) { return nil } +func backfillJob2PTblMetaMap(bJob *ddl.BackfillJob) map[int64]*ddl.BackfillJobRangeMeta { + m := &ddl.BackfillJobRangeMeta{ + ID: bJob.ID, + PhyTblID: bJob.PhysicalTableID, + StartKey: bJob.StartKey, + EndKey: bJob.EndKey, + } + mMap := make(map[int64]*ddl.BackfillJobRangeMeta) + mMap[m.PhyTblID] = m + return mMap +} + func TestSimpleExecBackfillJobs(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) @@ -255,6 +268,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { eleID1 := int64(11) eleID2 := int64(22) eleID3 := int64(33) + noPID := int64(0) uuid := d.GetID() eleKey := meta.IndexElementKey instanceLease := ddl.InstanceLease @@ -263,7 +277,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJob, err := ddl.GetBackfillJobForOneEle(se, []int64{jobID1, jobID2}, instanceLease) require.NoError(t, err) require.Nil(t, bJob) - bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) + bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, noPID, instanceLease) require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error()) require.Nil(t, bJobs) allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count") @@ -282,14 +296,14 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.Equal(t, err.Error(), "[table:1292]Incorrect timestamp value: '0000-00-00 00:00:00' for column 'exec_lease' at row 1") tk.Session().GetSessionVars().SQLMode = mysql.ModeNone err = ddl.AddBackfillJobs(se, bjTestCases) - // ID jobID eleID InstanceID - // ------------------------------------- - // 0 jobID1 eleID1 uuid - // 1 jobID1 eleID1 "" - // 0 jobID2 eleID2 "" - // 1 jobID2 eleID2 "" - // 0 jobID2 eleID3 "" - // 1 jobID2 eleID3 "" + // ID jobID eleID InstanceID PhysicalTableID + // -------------------------------------------------- + // 0 jobID1 eleID1 uuid 1 + // 1 jobID1 eleID1 "" 1 + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 require.NoError(t, err) // test get some backfill jobs bJob, err = ddl.GetBackfillJobForOneEle(se, []int64{jobID2 - 1, jobID2 + 1}, instanceLease) @@ -306,7 +320,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.NoError(t, err) }) - bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, instanceLease) + bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, noPID, instanceLease) require.NoError(t, err) require.Len(t, bJobs, 1) expectJob = bjTestCases[2] @@ -325,7 +339,160 @@ func TestSimpleExecBackfillJobs(t *testing.T) { allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, cnt) + // test physical table + err = ddl.RemoveBackfillJob(se, true, bJobs1[0]) + require.NoError(t, err) + // ID jobID eleID InstanceID PhysicalTableID + // -------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + bPhyJobs := makeAddIdxBackfillJobs(1, 2, jobID1, eleID1, 10, "alter table t add index idx(a)") + bPhyJobs[1].InstanceID = "uuid_1" + bPhyJobs[2].PhysicalTableID = 2 + bPhyJobs[6].PhysicalTableID = 2 + bPhyJobs[4].PhysicalTableID = 3 + bPhyJobs[5].PhysicalTableID = 3 + bPhyJobs[8].PhysicalTableID = 3 + bPhyJobs[7].PhysicalTableID = 4 + bPhyJobs[9].PhysicalTableID = 4 + // ID jobID eleID InstanceID InstanceLease PhysicalTableID + // ----------------------------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + // 0 jobID1 eleID1 "" 1 + // 1 jobID1 eleID1 "uuid_1" 1 + // 2 jobID1 eleID1 "" 2 + // 3 jobID1 eleID1 "" 1 + // 4 jobID1 eleID1 "" 3 + // 5 jobID1 eleID1 "" 3 + // 6 jobID1 eleID1 "" 2 + // 7 jobID1 eleID1 "" 4 + // 8 jobID1 eleID1 "" 3 + // 9 jobID1 eleID1 "" 4 + simpleCheck := func(batch, jobCnt int, bfJobIDs []int64, pID int64) { + err = ddl.AddBackfillJobs(se, bPhyJobs) + require.NoError(t, err) + bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, batch, jobID1, uuid, pID, instanceLease) + require.NoError(t, err) + require.Len(t, bJobs, jobCnt) + isExist := false + for _, id := range bfJobIDs { + if id == bJobs[0].ID { + isExist = true + } + } + require.True(t, isExist, fmt.Sprintf("expected ids:%v, actual id:%d", bfJobIDs, bJobs[0].ID)) + err = ddl.RemoveBackfillJob(se, true, bJobs1[0]) + require.NoError(t, err) + } + type cntAndID struct { + batch int + bfJobCnt int + bfJobID []int64 + } + checkAndClean := func(expectRet1, expectRet2 cntAndID) { + simpleCheck(expectRet1.batch, expectRet1.bfJobCnt, expectRet1.bfJobID, noPID) + simpleCheck(expectRet2.batch, expectRet2.bfJobCnt, expectRet2.bfJobID, ddl.GetJobWithoutPartition) + } + checkAndClean(cntAndID{3, 3, []int64{0, 1, 3}}, + cntAndID{3, 3, []int64{0, 1, 3}}) + bPhyJobs[1].InstanceLease = types.NewTime(types.FromGoTime(time.Now().Add(-time.Hour).UTC()), 0, 0) + // ID jobID eleID InstanceID InstanceLease PhysicalTableID + // ----------------------------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + // 0 jobID1 eleID1 "" 1 + // 1 jobID1 eleID1 "uuid_1" currentTime-hour 1 + // 2 jobID1 eleID1 "" 2 + // 3 jobID1 eleID1 "" 1 + // 4 jobID1 eleID1 "" 3 + // 5 jobID1 eleID1 "" 3 + // 6 jobID1 eleID1 "" 2 + // 7 jobID1 eleID1 "" 4 + // 8 jobID1 eleID1 "" 3 + // 9 jobID1 eleID1 "" 4 + checkAndClean(cntAndID{3, 3, []int64{0, 1, 3}}, + cntAndID{3, 3, []int64{0, 1, 3}}) + bPhyJobs[3].InstanceLease = types.NewTime(types.FromGoTime(time.Now().UTC()), 0, 0) + // ID jobID eleID InstanceID InstanceLease PhysicalTableID + // ----------------------------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + // 0 jobID1 eleID1 "" 1 + // 1 jobID1 eleID1 "uuid_1" currentTime-hour 1 + // 2 jobID1 eleID1 "" 2 + // 3 jobID1 eleID1 "" currentTime 1 // should not exist + // 4 jobID1 eleID1 "" 3 + // 5 jobID1 eleID1 "" 3 + // 6 jobID1 eleID1 "" 2 + // 7 jobID1 eleID1 "" 4 + // 8 jobID1 eleID1 "" 3 + // 9 jobID1 eleID1 "" 4 + checkAndClean(cntAndID{3, 2, []int64{2, 6}}, + cntAndID{3, 3, []int64{0, 1, 3}}) + bPhyJobs[6].InstanceLease = types.NewTime(types.FromGoTime(time.Now().UTC()), 0, 0) + // ID jobID eleID InstanceID InstanceLease PhysicalTableID + // ----------------------------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + // 0 jobID1 eleID1 "" 1 + // 1 jobID1 eleID1 "uuid_1" currentTime-hour 1 + // 2 jobID1 eleID1 "" 2 + // 3 jobID1 eleID1 "" currentTime 1 // should not exist + // 4 jobID1 eleID1 "" 3 + // 5 jobID1 eleID1 "" 3 + // 6 jobID1 eleID1 "" currentTime 2 // should not exist + // 7 jobID1 eleID1 "" 4 + // 8 jobID1 eleID1 "" 3 + // 9 jobID1 eleID1 "" 4 + checkAndClean(cntAndID{3, 2, []int64{2, 6}}, + cntAndID{10, 10, []int64{0, 1, 3}}) + bPhyJobs[6].InstanceID = "uuid_2" + // ID jobID eleID InstanceID InstanceLease PhysicalTableID + // ----------------------------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + // 0 jobID1 eleID1 "" 1 + // 1 jobID1 eleID1 "uuid_1" currentTime-hour 1 + // 2 jobID1 eleID1 "" 2 + // 3 jobID1 eleID1 "" currentTime 1 // should not exist + // 4 jobID1 eleID1 "" 3 + // 5 jobID1 eleID1 "" 3 + // 6 jobID1 eleID1 "uuid_2" currentTime 2 // should not exist + // 7 jobID1 eleID1 "" 4 + // 8 jobID1 eleID1 "" 3 + // 9 jobID1 eleID1 "" 4 + checkAndClean(cntAndID{3, 3, []int64{4, 5, 8}}, + cntAndID{10, 9, []int64{0, 1, 3}}) + // ID jobID eleID InstanceID InstanceLease PhysicalTableID + // ----------------------------------------------------------------------- + // 0 jobID2 eleID2 "" 1 + // 1 jobID2 eleID2 "" 1 + // 0 jobID2 eleID3 "" 1 + // 1 jobID2 eleID3 "" 1 + err = ddl.AddBackfillJobs(se, bJobs1) + require.NoError(t, err) + // ID jobID eleID + // ------------------------ + // 0 jobID1 eleID1 + // 1 jobID1 eleID1 + // 0 jobID2 eleID2 + // 1 jobID2 eleID2 + // 0 jobID2 eleID3 + // 1 jobID2 eleID3 // remove a backfill job err = ddl.RemoveBackfillJob(se, false, bJobs1[0]) // ID jobID eleID @@ -374,16 +541,16 @@ func TestSimpleExecBackfillJobs(t *testing.T) { currTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) - condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d order by ddl_job_id", currTime.Add(-instanceLease), jobID2) + condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d", currTime.Add(-instanceLease), jobID2) bJobs, err = ddl.GetBackfillJobs(se, ddl.BackfillHistoryTable, condition, "test_get_bj") require.NoError(t, err) require.Len(t, bJobs, 1) require.Equal(t, bJobs[0].FinishTS, uint64(0)) // test GetMaxBackfillJob - bjob, err := ddl.GetMaxBackfillJob(se, bJobs3[0].JobID, bJobs3[0].EleID, eleKey) + pTblMeta, err := ddl.GetPhysicalTableMetas(se, bJobs3[0].JobID, bJobs3[0].EleID, eleKey) require.NoError(t, err) - require.Nil(t, bjob) + require.Len(t, pTblMeta, 0) err = ddl.AddBackfillJobs(se, bjTestCases) require.NoError(t, err) // ID jobID eleID @@ -394,9 +561,9 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID2 eleID2 // 0 jobID2 eleID3 // 1 jobID2 eleID3 - bjob, err = ddl.GetMaxBackfillJob(se, jobID2, eleID2, eleKey) + pTblMeta, err = ddl.GetPhysicalTableMetas(se, jobID2, eleID2, eleKey) require.NoError(t, err) - require.Equal(t, bJobs2[1], bjob) + require.Equal(t, backfillJob2PTblMetaMap(bJobs2[1]), pTblMeta) bJobs1[0].State = model.JobStateRollingback bJobs1[0].ID = 2 bJobs1[0].InstanceID = uuid @@ -415,9 +582,9 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID2 eleID3 JobStateNone // 2 jobID1 eleID1 JobStateRollingback // 3 jobID1 eleID1 JobStateCancelled - bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + pTblMeta, err = ddl.GetPhysicalTableMetas(se, jobID1, eleID1, eleKey) require.NoError(t, err) - require.Equal(t, bJobs1[1], bjob) + require.Equal(t, backfillJob2PTblMetaMap(bJobs1[1]), pTblMeta) // test the BackfillJob's AbbrStr require.Equal(t, fmt.Sprintf("ID:2, JobID:1, EleID:11, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr()) require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelled, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) @@ -456,9 +623,9 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // -------------------------------- // 5 jobID1 eleID1 JobStateNone // 4 jobID1 eleID1 JobStateNone - bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + pTblMeta, err = ddl.GetPhysicalTableMetas(se, jobID1, eleID1, eleKey) require.NoError(t, err) - require.Equal(t, bJobs1[0], bjob) + require.Equal(t, backfillJob2PTblMetaMap(bJobs1[0]), pTblMeta) bJobs1[0].ID = 6 bJobs1[1].ID = 7 err = ddl.AddBackfillJobs(se, bJobs1) @@ -481,9 +648,9 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // -------------------------------- // 5 jobID1 eleID1 JobStateNone // 4 jobID1 eleID1 JobStateNone - bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + pTblMeta, err = ddl.GetPhysicalTableMetas(se, jobID1, eleID1, eleKey) require.NoError(t, err) - require.Equal(t, bJobs1[1], bjob) + require.Equal(t, backfillJob2PTblMetaMap(bJobs1[1]), pTblMeta) // test MoveBackfillJobsToHistoryTable and GetInterruptedBackfillJobForOneEle allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") @@ -593,7 +760,7 @@ func TestGetTasks(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(1)`)) ch <- struct{}{} var bJobs []*ddl.BackfillJob - bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) + bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, 1, instanceLease) require.Len(t, bJobs, 1) }) <-ch @@ -604,7 +771,7 @@ func TestGetTasks(t *testing.T) { se1 := ddl.NewSession(tk1.Session()) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(2)`)) var bJobs1 []*ddl.BackfillJob - bJobs1, err1 = ddl.GetAndMarkBackfillJobsForOneEle(se1, 1, jobID1, uuid, instanceLease) + bJobs1, err1 = ddl.GetAndMarkBackfillJobsForOneEle(se1, 1, jobID1, uuid, 1, instanceLease) require.Len(t, bJobs1, 1) }) wg.Wait() @@ -616,6 +783,10 @@ func TestGetTasks(t *testing.T) { require.True(t, strings.Contains(err.Error(), "[kv:9007]Write conflict")) } + err = ddl.RemoveBackfillJob(se, true, bJobsTestCases[0]) + require.NoError(t, err) + err = ddl.AddBackfillJobs(se, bJobsTestCases) + require.NoError(t, err) // get tbl tk.MustExec("create table t(a int, b int)") var tableID int64 @@ -624,6 +795,7 @@ func TestGetTasks(t *testing.T) { require.Nil(t, err) tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) + pID := int64(0) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(0)`)) // Mock GetAndMarkBackfillJobsForOneEle gets a writing conflict error, but getTasks is successful. // Step 1: se1 begins txn1. @@ -634,7 +806,7 @@ func TestGetTasks(t *testing.T) { wg.Run(func() { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(1)`)) ch <- struct{}{} - bJobs, err := ddl.GetTasks(ddl.GetDDLCtx(d), se, tbl, jobID1, 1) + bJobs, err := ddl.GetTasks(ddl.GetDDLCtx(d), se, tbl, jobID1, &pID, 1) require.Nil(t, err) require.Len(t, bJobs, 1) }) @@ -644,7 +816,7 @@ func TestGetTasks(t *testing.T) { tk1.MustExec("use test") se1 := ddl.NewSession(tk1.Session()) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(2)`)) - bJobs1, err1 := ddl.GetTasks(ddl.GetDDLCtx(d), se1, tbl, jobID1, 1) + bJobs1, err1 := ddl.GetTasks(ddl.GetDDLCtx(d), se1, tbl, jobID1, &pID, 1) require.Nil(t, err1) require.Len(t, bJobs1, 1) }) diff --git a/metrics/metrics.go b/metrics/metrics.go index 62b6973816424..f63af624a3a0c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -46,15 +46,15 @@ var ( // metrics labels. const ( - LabelSession = "session" - LabelDomain = "domain" - LabelDDLOwner = "ddl-owner" - LabelDDL = "ddl" - LabelDDLWorker = "ddl-worker" - LabelBackfillWorker = "backfill-worker" - LabelDDLSyncer = "ddl-syncer" - LabelGCWorker = "gcworker" - LabelAnalyze = "analyze" + LabelSession = "session" + LabelDomain = "domain" + LabelDDLOwner = "ddl-owner" + LabelDDL = "ddl" + LabelDDLWorker = "ddl-worker" + LabelDistReorg = "dist-reorg" + LabelDDLSyncer = "ddl-syncer" + LabelGCWorker = "gcworker" + LabelAnalyze = "analyze" LabelBatchRecvLoop = "batch-recv-loop" LabelBatchSendLoop = "batch-send-loop" diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 574e8d932193c..3eec005372384 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -437,10 +437,9 @@ type JobMeta struct { // BackfillMeta is meta info of the backfill job. type BackfillMeta struct { - PhysicalTableID int64 `json:"physical_table_id"` - IsUnique bool `json:"is_unique"` - EndInclude bool `json:"end_include"` - Error *terror.Error `json:"err"` + IsUnique bool `json:"is_unique"` + EndInclude bool `json:"end_include"` + Error *terror.Error `json:"err"` SQLMode mysql.SQLMode `json:"sql_mode"` Warnings map[errors.ErrorID]*terror.Error `json:"warnings"` diff --git a/sessionctx/BUILD.bazel b/sessionctx/BUILD.bazel index 800001fd426b3..f1987e5ddb063 100644 --- a/sessionctx/BUILD.bazel +++ b/sessionctx/BUILD.bazel @@ -26,7 +26,7 @@ go_library( go_test( name = "sessionctx_test", - timeout = "short", + timeout = "moderate", srcs = [ "context_test.go", "main_test.go",