Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sessionctx: add validation for tidb_enable_plan_replayer_continues_capture #40787

Merged
merged 15 commits into from
Jan 29, 2023
Merged
1 change: 0 additions & 1 deletion domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ go_library(
"//privilege/privileges",
"//sessionctx",
"//sessionctx/sessionstates",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics/handle",
"//telemetry",
Expand Down
16 changes: 13 additions & 3 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
Expand Down Expand Up @@ -164,7 +164,18 @@ func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sql", record.OriginSQL),
zap.Error(err))
// try insert record without original sql
_, err = exec.ExecuteInternal(ctx, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, token, instance) values ('%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.Token, instance))
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.Error(err))
}
}
}

Expand Down Expand Up @@ -379,6 +390,7 @@ func (w *planReplayerTaskDumpWorker) handleTask(task *PlanReplayerDumpTask) {
occupy := true
handleTask := true
defer func() {
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpWorker", nil, false)
logutil.BgLogger().Debug("[plan-replayer-capture] handle task",
zap.String("sql-digest", sqlDigest),
zap.String("plan-digest", planDigest),
Expand Down Expand Up @@ -431,7 +443,6 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
}
task.Zf = file
task.FileName = fileName
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
if task.InExecute && len(task.NormalizedSQL) > 0 {
p := parser.New()
stmts, _, err := p.ParseSQL(task.NormalizedSQL)
Expand Down Expand Up @@ -538,7 +549,6 @@ type PlanReplayerDumpTask struct {
replayer.PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
TblStats map[int64]interface{}
InExecute bool
NormalizedSQL string
Expand Down
9 changes: 8 additions & 1 deletion domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,14 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,

// For capture task, we dump stats in storage only if EnableHistoricalStatsForCapture is disabled.
// For manual plan replayer dump command, we directly dump stats in storage
if !variable.EnableHistoricalStatsForCapture.Load() || !task.IsCapture {
if task.IsCapture {
if !task.IsContinuesCapture && variable.EnableHistoricalStatsForCapture.Load() {
// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return err
}
}
} else {
// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return err
Expand Down
31 changes: 19 additions & 12 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,17 +1412,7 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm
// 4. update the `PrevStmt` in session variable.
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}
a.checkPlanReplayerCapture(txnTS)

sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
Expand Down Expand Up @@ -1485,6 +1475,23 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}
}

func (a *ExecStmt) checkPlanReplayerCapture(txnTS uint64) {
if kv.GetInternalSourceType(a.GoCtx) == kv.InternalTxnStats {
return
}
se := a.Ctx
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := a.GetStmtNode()
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
if checkPlanReplayerContinuesCaptureValidStmt(stmtNode) {
checkPlanReplayerContinuesCapture(se, stmtNode, txnTS)
}
} else {
checkPlanReplayerCaptureTask(se, stmtNode, txnTS)
}
}
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.FinishExecuteStmt(txnStartTS, lastErr, false)
Expand Down Expand Up @@ -2112,7 +2119,6 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
Expand All @@ -2121,6 +2127,7 @@ func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
dumpTask.EncodedPlan, _ = GetEncodedPlan(stmtCtx, false)
if _, ok := stmtNode.(*ast.ExecuteStmt); ok {
nsql, _ := sctx.GetSessionVars().StmtCtx.SQLDigest()
dumpTask.InExecute = true
Expand Down
9 changes: 9 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ func TestPlanReplayerCapture(t *testing.T) {
func TestPlanReplayerContinuesCapture(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@global.tidb_enable_historical_stats='OFF'")
_, err := tk.Exec("set @@global.tidb_enable_plan_replayer_continues_capture='ON'")
require.Error(t, err)
require.Equal(t, err.Error(), "tidb_enable_historical_stats should be enabled before enabling tidb_enable_plan_replayer_continues_capture")

tk.MustExec("set @@global.tidb_enable_historical_stats='ON'")
tk.MustExec("set @@global.tidb_enable_plan_replayer_continues_capture='ON'")

prHandle := dom.GetPlanReplayerHandle()
tk.MustExec("delete from mysql.plan_replayer_status;")
tk.MustExec("use test")
Expand Down
11 changes: 11 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package kv

import (
"context"

"github.com/tikv/client-go/v2/util"
)

Expand Down Expand Up @@ -136,6 +138,15 @@ type RequestSource = util.RequestSource
// WithInternalSourceType create context with internal source.
var WithInternalSourceType = util.WithInternalSourceType

// GetInternalSourceType get internal source
func GetInternalSourceType(ctx context.Context) string {
v := ctx.Value(util.RequestSourceKey)
if v == nil {
return ""
}
return v.(util.RequestSource).RequestSourceType
}

const (
// InternalTxnOthers is the type of requests that consume low resources.
// This reduces the size of metrics.
Expand Down
17 changes: 17 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,12 +1188,29 @@ var defaultSysVars = []*SysVar{
/* The system variables below have GLOBAL and SESSION scope */
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePlanReplayerContinuesCapture, Value: BoolToOnOff(false), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
historicalStatsEnabled, err := s.GlobalVarsAccessor.GetGlobalSysVar(TiDBEnableHistoricalStats)
if err != nil {
return err
}
if !TiDBOptOn(historicalStatsEnabled) && TiDBOptOn(val) {
return errors.Errorf("%v should be enabled before enabling %v", TiDBEnableHistoricalStats, TiDBEnablePlanReplayerContinuesCapture)
}
s.EnablePlanReplayedContinuesCapture = TiDBOptOn(val)
return nil
},
GetSession: func(vars *SessionVars) (string, error) {
return BoolToOnOff(vars.EnablePlanReplayedContinuesCapture), nil
},
Validation: func(vars *SessionVars, s string, s2 string, flag ScopeFlag) (string, error) {
historicalStatsEnabled, err := vars.GlobalVarsAccessor.GetGlobalSysVar(TiDBEnableHistoricalStats)
if err != nil {
return "", err
}
if !TiDBOptOn(historicalStatsEnabled) && TiDBOptOn(s) {
return "", errors.Errorf("%v should be enabled before enabling %v", TiDBEnableHistoricalStats, TiDBEnablePlanReplayerContinuesCapture)
}
return s, nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePlanReplayerCapture, Value: BoolToOnOff(true), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,7 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", statsVer),
zap.String("source", source),
zap.Error(err1))
}
}
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/historical_stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64, source
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", version),
zap.String("source", source),
zap.Error(err))
}
}