Skip to content

Commit

Permalink
metrics: add col/idx name(s) for BackfillProgressGauge and BackfillTo…
Browse files Browse the repository at this point in the history
…talCounter (#58380)

close #58114
  • Loading branch information
CbcWestwolf authored Dec 23, 2024
1 parent ef7ade7 commit 042a332
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 56 deletions.
59 changes: 45 additions & 14 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/hex"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -194,23 +195,49 @@ func newBackfillCtx(id int, rInfo *reorgInfo,
id = int(backfillContextID.Add(1))
}

colOrIdxName := ""
switch rInfo.Job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
args, err := model.GetModifyIndexArgs(rInfo.Job)
if err != nil {
logutil.DDLLogger().Error("Fail to get ModifyIndexArgs", zap.String("label", label), zap.String("schemaName", schemaName), zap.String("tableName", tbl.Meta().Name.String()))
} else {
colOrIdxName = getIdxNamesFromArgs(args)
}
case model.ActionModifyColumn:
oldCol, _ := getOldAndNewColumnsForUpdateColumn(tbl, rInfo.currElement.ID)
if oldCol != nil {
colOrIdxName = oldCol.Name.String()
}
}

batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
return &backfillCtx{
id: id,
ddlCtx: rInfo.jobCtx.oldDDLCtx,
warnings: warnHandler,
exprCtx: exprCtx,
tblCtx: tblCtx,
loc: exprCtx.GetEvalCtx().Location(),
schemaName: schemaName,
table: tbl,
batchCnt: batchCnt,
jobContext: jobCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())),
id: id,
ddlCtx: rInfo.jobCtx.oldDDLCtx,
warnings: warnHandler,
exprCtx: exprCtx,
tblCtx: tblCtx,
loc: exprCtx.GetEvalCtx().Location(),
schemaName: schemaName,
table: tbl,
batchCnt: batchCnt,
jobContext: jobCtx,
metricCounter: metrics.GetBackfillTotalByLabel(label, schemaName, tbl.Meta().Name.String(), colOrIdxName),
}, nil
}

func getIdxNamesFromArgs(args *model.ModifyIndexArgs) string {
var sb strings.Builder
for i, idx := range args.IndexArgs {
if i > 0 {
sb.WriteString("+")
}
sb.WriteString(idx.IndexName.O)
}
return sb.String()
}

func updateTxnEntrySizeLimitIfNeeded(txn kv.Transaction) {
if entrySizeLimit := variable.TxnEntrySizeLimit.Load(); entrySizeLimit > 0 {
txn.SetOption(kv.SizeLimits, kv.TxnSizeLimits{
Expand Down Expand Up @@ -686,6 +713,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
idxCnt := len(reorgInfo.elements)
indexIDs := make([]int64, 0, idxCnt)
indexInfos := make([]*model.IndexInfo, 0, idxCnt)
var indexNames strings.Builder
uniques := make([]bool, 0, idxCnt)
hasUnique := false
for _, e := range reorgInfo.elements {
Expand All @@ -699,6 +727,10 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return errors.Errorf("index info not found: %d", e.ID)
}
indexInfos = append(indexInfos, indexInfo)
if indexNames.Len() > 0 {
indexNames.WriteString("+")
}
indexNames.WriteString(indexInfo.Name.O)
uniques = append(uniques, indexInfo.Unique)
hasUnique = hasUnique || indexInfo.Unique
}
Expand Down Expand Up @@ -736,8 +768,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
rowCntListener := &localRowCntListener{
prevPhysicalRowCnt: reorgCtx.getRowCount(),
reorgCtx: reorgCtx,
counter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)),
counter: metrics.GetBackfillTotalByLabel(metrics.LblAddIdxRate, job.SchemaName, job.TableName, indexNames.String()),
}

sctx, err := sessPool.Get()
Expand Down
6 changes: 0 additions & 6 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,6 @@ func NewIndexIngestOperator(
writers = append(writers, writer)
}

indexIDs := make([]int64, len(indexes))
for i := 0; i < len(indexes); i++ {
indexIDs[i] = indexes[i].Meta().ID
}
return &indexIngestLocalWorker{
indexIngestBaseWorker: indexIngestBaseWorker{
ctx: ctx,
Expand All @@ -762,7 +758,6 @@ func NewIndexIngestOperator(
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
},
indexIDs: indexIDs,
backendCtx: backendCtx,
rowCntListener: rowCntListener,
cpMgr: cpMgr,
Expand Down Expand Up @@ -793,7 +788,6 @@ func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(In

type indexIngestLocalWorker struct {
indexIngestBaseWorker
indexIDs []int64
backendCtx ingest.BackendCtx
rowCntListener RowCountListener
cpMgr *ingest.CheckpointManager
Expand Down
22 changes: 17 additions & 5 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -224,9 +225,14 @@ func (r *readIndexExecutor) buildLocalStorePipeline(
d := r.d
indexIDs := make([]int64, 0, len(r.indexes))
uniques := make([]bool, 0, len(r.indexes))
var idxNames strings.Builder
for _, index := range r.indexes {
indexIDs = append(indexIDs, index.ID)
uniques = append(uniques, index.Unique)
if idxNames.Len() > 0 {
idxNames.WriteByte('+')
}
idxNames.WriteString(index.Name.O)
}
engines, err := r.bc.Register(indexIDs, uniques, r.ptbl)
if err != nil {
Expand All @@ -236,7 +242,7 @@ func (r *readIndexExecutor) buildLocalStorePipeline(
zap.Int64s("index IDs", indexIDs))
return nil, err
}
rowCntListener := newDistTaskRowCntListener(r.curRowCount, r.job.SchemaName, tbl.Meta().Name.O)
rowCntListener := newDistTaskRowCntListener(r.curRowCount, r.job.SchemaName, tbl.Meta().Name.O, idxNames.String())
return NewAddIndexIngestPipeline(
opCtx,
d.store,
Expand Down Expand Up @@ -280,7 +286,14 @@ func (r *readIndexExecutor) buildExternalStorePipeline(
kvMeta.MergeSummary(summary)
s.mu.Unlock()
}
rowCntListener := newDistTaskRowCntListener(r.curRowCount, r.job.SchemaName, tbl.Meta().Name.O)
var idxNames strings.Builder
for _, idx := range r.indexes {
if idxNames.Len() > 0 {
idxNames.WriteByte('+')
}
idxNames.WriteString(idx.Name.O)
}
rowCntListener := newDistTaskRowCntListener(r.curRowCount, r.job.SchemaName, tbl.Meta().Name.O, idxNames.String())
return NewWriteIndexToExternalStoragePipeline(
opCtx,
d.store,
Expand All @@ -307,9 +320,8 @@ type distTaskRowCntListener struct {
counter prometheus.Counter
}

func newDistTaskRowCntListener(totalRowCnt *atomic.Int64, dbName, tblName string) *distTaskRowCntListener {
counter := metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", dbName, tblName))
func newDistTaskRowCntListener(totalRowCnt *atomic.Int64, dbName, tblName, idxName string) *distTaskRowCntListener {
counter := metrics.GetBackfillTotalByLabel(metrics.LblAddIdxRate, dbName, tblName, idxName)
return &distTaskRowCntListener{
totalRowCount: totalRowCnt,
counter: counter,
Expand Down
5 changes: 3 additions & 2 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/resourcegroup"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
Expand Down Expand Up @@ -271,7 +272,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false, false)
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblAddIdxRate, false, false)
if err != nil {
return err
}
Expand All @@ -284,7 +285,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
runner = newBackfillWorker(b.ctx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false, false)
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblMergeTmpIdxRate, false, false)
if err != nil {
return err
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -609,8 +610,22 @@ type updateColumnWorker struct {
checksumNeeded bool
}

func getOldAndNewColumnsForUpdateColumn(t table.Table, currElementID int64) (oldCol, newCol *model.ColumnInfo) {
for _, col := range t.WritableCols() {
if col.ID == currElementID {
changeColumnOrigName := table.FindCol(t.Cols(), getChangingColumnOriginName(col.ColumnInfo))
if changeColumnOrigName != nil {
newCol = col.ColumnInfo
oldCol = changeColumnOrigName.ColumnInfo
return
}
}
}
return
}

func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*updateColumnWorker, error) {
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false, true)
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblUpdateColRate, false, true)
if err != nil {
return nil, err
}
Expand All @@ -620,14 +635,7 @@ func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64
zap.Stringer("reorgInfo", reorgInfo))
return nil, nil
}
var oldCol, newCol *model.ColumnInfo
for _, col := range t.WritableCols() {
if col.ID == reorgInfo.currElement.ID {
newCol = col.ColumnInfo
oldCol = table.FindCol(t.Cols(), getChangingColumnOriginName(newCol)).ColumnInfo
break
}
}
oldCol, newCol := getOldAndNewColumnsForUpdateColumn(t, reorgInfo.currElement.ID)
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
failpoint.Inject("forceRowLevelChecksumOnUpdateColumnBackfill", func() {
orig := variable.EnableRowLevelChecksum.Load()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2921,7 +2921,7 @@ type cleanUpIndexWorker struct {
}

func newCleanUpIndexWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*cleanUpIndexWorker, error) {
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false, false)
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblCleanupIdxRate, false, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (w *worker) doModifyColumnTypeWithData(
// Make sure job args change after `updateVersionAndTableInfoWithCheck`, otherwise, the job args will
// be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`.
job.SchemaState = model.StateDeleteOnly
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String()).Set(0)
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String(), args.OldColumnName.O).Set(0)
args.ChangingColumn = changingCol
args.ChangingIdxs = changingIdxs
failpoint.InjectCall("modifyColumnTypeWithData", job, args)
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3334,7 +3334,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
}

// Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that.
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
Expand Down Expand Up @@ -3398,7 +3398,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
}
}
tblInfo.Partition.DDLState = model.StateWriteOnly
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.2 / float64(math.MaxUint64))
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.2 / float64(math.MaxUint64))
failpoint.Inject("reorgPartRollback2", func(val failpoint.Value) {
if val.(bool) {
err = errors.New("Injected error by reorgPartRollback2")
Expand All @@ -3419,7 +3419,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
}
job.SchemaState = model.StateWriteReorganization
tblInfo.Partition.DDLState = job.SchemaState
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64))
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.3 / float64(math.MaxUint64))
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateWriteReorganization:
physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
Expand Down Expand Up @@ -3813,7 +3813,7 @@ type reorgPartitionWorker struct {
}

func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) {
bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false, false)
bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblReorgPartitionRate, false, false)
if err != nil {
return nil, err
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,26 @@ func updateBackfillProgress(w *worker, reorgInfo *reorgInfo, tblInfo *model.Tabl
} else {
label = metrics.LblAddIndex
}
metrics.GetBackfillProgressByLabel(label, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100)
idxNames := ""
args, err := model.GetModifyIndexArgs(reorgInfo.Job)
if err != nil {
logutil.DDLLogger().Error("Fail to get ModifyIndexArgs", zap.Error(err))
} else {
idxNames = getIdxNamesFromArgs(args)
}
metrics.GetBackfillProgressByLabel(label, reorgInfo.SchemaName, tblInfo.Name.String(), idxNames).Set(progress * 100)
case model.ActionModifyColumn:
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100)
colName := ""
args, err := model.GetModifyColumnArgs(reorgInfo.Job)
if err != nil {
logutil.DDLLogger().Error("Fail to get ModifyColumnArgs", zap.Error(err))
} else {
colName = args.OldColumnName.O
}
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, reorgInfo.SchemaName, tblInfo.Name.String(), colName).Set(progress * 100)
case model.ActionReorganizePartition, model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100)
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, reorgInfo.SchemaName, tblInfo.Name.String(), "").Set(progress * 100)
}
}

Expand Down
Loading

0 comments on commit 042a332

Please sign in to comment.