Skip to content

Commit

Permalink
ddl: add more metrics for different operations of job execution, refi…
Browse files Browse the repository at this point in the history
…ne panels (#58360)

ref #54436
  • Loading branch information
D3Hunter authored Dec 20, 2024
1 parent 0f653f3 commit bf7016d
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 368 deletions.
18 changes: 14 additions & 4 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,40 +392,50 @@ func newSchemaVersionManager(store kv.Storage) *schemaVersionManager {
}
}

func (sv *schemaVersionManager) setSchemaVersion(job *model.Job) (schemaVersion int64, err error) {
err = sv.lockSchemaVersion(job.ID)
func (sv *schemaVersionManager) setSchemaVersion(jobCtx *jobContext, job *model.Job) (schemaVersion int64, err error) {
err = sv.lockSchemaVersion(jobCtx, job.ID)
if err != nil {
return schemaVersion, errors.Trace(err)
}
// TODO we can merge this txn into job transaction to avoid schema version
// without differ.
start := time.Now()
err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), sv.store, true, func(_ context.Context, txn kv.Transaction) error {
var err error
m := meta.NewMutator(txn)
schemaVersion, err = m.GenSchemaVersion()
return err
})
defer func() {
metrics.DDLIncrSchemaVerOpHist.Observe(time.Since(start).Seconds())
}()
return schemaVersion, err
}

// lockSchemaVersion gets the lock to prevent the schema version from being updated.
func (sv *schemaVersionManager) lockSchemaVersion(jobID int64) error {
func (sv *schemaVersionManager) lockSchemaVersion(jobCtx *jobContext, jobID int64) error {
ownerID := sv.lockOwner.Load()
// There may exist one job update schema version many times in multiple-schema-change, so we do not lock here again
// if they are the same job.
if ownerID != jobID {
start := time.Now()
sv.schemaVersionMu.Lock()
defer func() {
metrics.DDLLockSchemaVerOpHist.Observe(time.Since(start).Seconds())
}()
jobCtx.lockStartTime = time.Now()
sv.lockOwner.Store(jobID)
}
return nil
}

// unlockSchemaVersion releases the lock.
func (sv *schemaVersionManager) unlockSchemaVersion(jobID int64) {
func (sv *schemaVersionManager) unlockSchemaVersion(jobCtx *jobContext, jobID int64) {
ownerID := sv.lockOwner.Load()
if ownerID == jobID {
sv.lockOwner.Store(0)
sv.schemaVersionMu.Unlock()
metrics.DDLLockVerDurationHist.Observe(time.Since(jobCtx.lockStartTime).Seconds())
}
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ func (s *jobScheduler) loadAndDeliverJobs(se *sess.Session) error {
}

s.deliveryJob(wk, targetPool, &job)

if s.generalDDLWorkerPool.available() == 0 && s.reorgWorkerPool.available() == 0 {
break
}
Expand Down Expand Up @@ -476,6 +475,10 @@ func (s *jobScheduler) deliveryJob(wk *worker, pool *workerPool, job *model.Job)
jobCtx := s.getJobRunCtx(job.ID, job.TraceInfo)

s.wg.Run(func() {
start := time.Now()
defer func() {
metrics.DDLRunJobOpHist.Observe(time.Since(start).Seconds())
}()
defer func() {
r := recover()
if r != nil {
Expand Down Expand Up @@ -612,6 +615,10 @@ func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, jobCtx *jobConte

// cleanMDLInfo cleans metadata lock info.
func (s *jobScheduler) cleanMDLInfo(job *model.Job, ownerID string) {
start := time.Now()
defer func() {
metrics.DDLCleanMDLInfoHist.Observe(time.Since(start).Seconds())
}()
if !variable.EnableMDL.Load() {
return
}
Expand Down
28 changes: 16 additions & 12 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ type jobContext struct {
jobArgs model.JobArgs

// TODO reorg part of code couple this struct so much, remove it later.
oldDDLCtx *ddlCtx
oldDDLCtx *ddlCtx
lockStartTime time.Time
}

func (c *jobContext) getAutoIDRequirement() autoid.Requirement {
Expand Down Expand Up @@ -351,11 +352,6 @@ func JobNeedGC(job *model.Job) bool {
// finishDDLJob deletes the finished DDL job in the ddl queue and puts it to history queue.
// If the DDL job need to handle in background, it will prepare a background job.
func (w *worker) finishDDLJob(jobCtx *jobContext, job *model.Job) (err error) {
startTime := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

if JobNeedGC(job) {
err = w.delRangeManager.addDelRangeJob(w.workCtx, job)
if err != nil {
Expand Down Expand Up @@ -473,6 +469,10 @@ func (w *ReorgContext) setDDLLabelForDiagnosis(jobType model.ActionType) {
}

func (w *worker) handleJobDone(jobCtx *jobContext, job *model.Job) error {
start := time.Now()
defer func() {
metrics.DDLHandleJobDoneOpHist.Observe(time.Since(start).Seconds())
}()
if err := w.checkBeforeCommit(); err != nil {
return err
}
Expand Down Expand Up @@ -565,20 +565,24 @@ func (w *worker) transitOneJobStep(
}
failpoint.InjectCall("beforeRunOneJobStep", job)

start := time.Now()
defer func() {
metrics.DDLTransitOneStepOpHist.Observe(time.Since(start).Seconds())
}()
// If running job meets error, we will save this error in job Error and retry
// later if the job is not cancelled.
schemaVer, updateRawArgs, runJobErr := w.runOneJobStep(jobCtx, job, sysTblMgr)

failpoint.InjectCall("afterRunOneJobStep", job)

if job.IsCancelled() {
defer jobCtx.unlockSchemaVersion(job.ID)
defer jobCtx.unlockSchemaVersion(jobCtx, job.ID)
w.sess.Reset()
return 0, w.handleJobDone(jobCtx, job)
}

if err = w.checkBeforeCommit(); err != nil {
jobCtx.unlockSchemaVersion(job.ID)
jobCtx.unlockSchemaVersion(jobCtx, job.ID)
return 0, err
}

Expand All @@ -598,19 +602,19 @@ func (w *worker) transitOneJobStep(
err = w.registerMDLInfo(job, schemaVer)
if err != nil {
w.sess.Rollback()
jobCtx.unlockSchemaVersion(job.ID)
jobCtx.unlockSchemaVersion(jobCtx, job.ID)
return 0, err
}
err = w.updateDDLJob(jobCtx, job, updateRawArgs)
if err = w.handleUpdateJobError(jobCtx, job, err); err != nil {
w.sess.Rollback()
jobCtx.unlockSchemaVersion(job.ID)
jobCtx.unlockSchemaVersion(jobCtx, job.ID)
return 0, err
}
// reset the SQL digest to make topsql work right.
w.sess.GetSessionVars().StmtCtx.ResetSQLDigest(job.Query)
err = w.sess.Commit(w.workCtx)
jobCtx.unlockSchemaVersion(job.ID)
jobCtx.unlockSchemaVersion(jobCtx, job.ID)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -788,7 +792,7 @@ func (w *worker) runOneJobStep(
job.RealStartTS = jobCtx.metaMut.StartTS
}
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerRunDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
metrics.DDLWorkerHistogram.WithLabelValues(metrics.DDLRunOneStep, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
}()

if job.IsCancelling() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func SetSchemaDiffForMultiInfos(diff *model.SchemaDiff, multiInfos ...schemaIDAn

// updateSchemaVersion increments the schema version by 1 and sets SchemaDiff.
func updateSchemaVersion(jobCtx *jobContext, job *model.Job, multiInfos ...schemaIDAndTableInfo) (int64, error) {
schemaVersion, err := jobCtx.setSchemaVersion(job)
schemaVersion, err := jobCtx.setSchemaVersion(jobCtx, job)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func waitVersionSynced(
})
timeStart := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerWaitSchemaChanged, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
metrics.DDLWorkerHistogram.WithLabelValues(metrics.DDLWaitSchemaSynced, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
}()
// WaitVersionSynced returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err = jobCtx.schemaVerSyncer.WaitVersionSynced(ctx, job.ID, latestSchemaVersion)
Expand Down
44 changes: 38 additions & 6 deletions pkg/metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,34 @@ var (
OwnerHandleSyncerHistogram *prometheus.HistogramVec

// Metrics for job_worker.go.
WorkerNotifyDDLJob = "notify_job"
WorkerAddDDLJob = "add_job"
WorkerRunDDLJob = "run_job"
WorkerFinishDDLJob = "finish_job"
WorkerWaitSchemaChanged = "wait_schema_changed"
DDLWorkerHistogram *prometheus.HistogramVec
WorkerAddDDLJob = "add_job"
DDLWorkerHistogram *prometheus.HistogramVec

// DDLRunOneStep is the label for the DDL worker operation run_one_step.
//
// if a DDL job runs successfully, the cost time is mostly in below structure:
//
// run_job
// ├─ step-1
// │ ├─ transit_one_step
// │ │ ├─ run_one_step
// │ │ │ ├─ lock_schema_ver
// │ │ │ ├─ incr_schema_ver
// │ │ │ ├─ async_notify
// │ │ ├─ other common works such as register MDL, commit, etc.
// │ ├─ wait_schema_synced
// │ ├─ clean_mdl_info
// ├─ step-2/3/4 ... similar as above -> done state
// ├─ handle_job_done
DDLRunOneStep = "run_one_step"
DDLWaitSchemaSynced = "wait_schema_synced"
DDLIncrSchemaVerOpHist prometheus.Observer
DDLLockSchemaVerOpHist prometheus.Observer
DDLRunJobOpHist prometheus.Observer
DDLHandleJobDoneOpHist prometheus.Observer
DDLTransitOneStepOpHist prometheus.Observer
DDLLockVerDurationHist prometheus.Observer
DDLCleanMDLInfoHist prometheus.Observer

CreateDDLInstance = "create_ddl_instance"
CreateDDL = "create_ddl"
Expand Down Expand Up @@ -174,6 +196,16 @@ func InitDDLMetrics() {
Help: "scan rate",
Buckets: prometheus.ExponentialBuckets(0.05, 2, 20),
}, []string{LblType})

// those metrics are for diagnose performance issues of running multiple DDLs
// is a short time window, so we don't need to add label for DDL type.
DDLIncrSchemaVerOpHist = DDLWorkerHistogram.WithLabelValues("incr_schema_ver", "*", "*")
DDLLockSchemaVerOpHist = DDLWorkerHistogram.WithLabelValues("lock_schema_ver", "*", "*")
DDLRunJobOpHist = DDLWorkerHistogram.WithLabelValues("run_job", "*", "*")
DDLHandleJobDoneOpHist = DDLWorkerHistogram.WithLabelValues("handle_job_done", "*", "*")
DDLTransitOneStepOpHist = DDLWorkerHistogram.WithLabelValues("transit_one_step", "*", "*")
DDLLockVerDurationHist = DDLWorkerHistogram.WithLabelValues("lock_ver_duration", "*", "*")
DDLCleanMDLInfoHist = DDLWorkerHistogram.WithLabelValues("clean_mdl_info", "*", "*")
}

// Label constants.
Expand Down
Loading

0 comments on commit bf7016d

Please sign in to comment.