Skip to content

Commit

Permalink
domain: Support plan replayer continus capture (#39926)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Dec 15, 2022
1 parent 3760815 commit 3aba336
Show file tree
Hide file tree
Showing 19 changed files with 305 additions and 117 deletions.
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//util/memory",
"//util/memoryusagealarm",
"//util/printer",
"//util/replayer",
"//util/servermemorylimit",
"//util/sqlexec",
"@com_github_burntsushi_toml//:toml",
Expand Down Expand Up @@ -122,6 +123,7 @@ go_test(
"//testkit/testsetup",
"//util",
"//util/mock",
"//util/replayer",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
7 changes: 4 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/memoryusagealarm"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -890,7 +891,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
infoCache: infoschema.NewCache(16),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500),
indexUsageSyncLease: idxUsageSyncLease,
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
onClose: onClose,
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
mdlCheckTableInfo: &mdlCheckTableInfo{
Expand Down Expand Up @@ -1647,8 +1648,8 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, work
}
taskCH := make(chan *PlanReplayerDumpTask, 16)
taskStatus := &planReplayerDumpTaskStatus{}
taskStatus.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
taskStatus.runningTaskMu.runningTasks = map[PlanReplayerTaskKey]struct{}{}
taskStatus.finishedTaskMu.finishedTask = map[replayer.PlanReplayerTaskKey]struct{}{}
taskStatus.runningTaskMu.runningTasks = map[replayer.PlanReplayerTaskKey]struct{}{}

do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{
taskCH: taskCH,
Expand Down
129 changes: 46 additions & 83 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ package domain

import (
"context"
"encoding/base64"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
Expand All @@ -29,7 +27,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
Expand All @@ -41,6 +38,7 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
Expand All @@ -55,13 +53,6 @@ type dumpFileGcChecker struct {
planReplayerTaskStatus *planReplayerDumpTaskStatus
}

// GetPlanReplayerDirName returns plan replayer directory path.
// The path is related to the process id.
func GetPlanReplayerDirName() string {
tidbLogDir := filepath.Dir(config.GetGlobalConfig().Log.File.Filename)
return filepath.Join(tidbLogDir, "replayer")
}

func parseType(s string) string {
return strings.Split(s, "_")[0]
}
Expand Down Expand Up @@ -188,7 +179,9 @@ func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
case h.planReplayerTaskDumpHandle.taskCH <- task:
// we directly remove the task key if we put task in channel successfully, if the task was failed to dump,
// the task handle will re-add the task in next loop
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
if !task.IsContinuesCapture {
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
}
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
Expand All @@ -200,7 +193,7 @@ func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
tasks map[replayer.PlanReplayerTaskKey]struct{}
}
ctx context.Context
sctx sessionctx.Context
Expand All @@ -212,7 +205,7 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
if err != nil {
return err
}
tasks := make([]PlanReplayerTaskKey, 0)
tasks := make([]replayer.PlanReplayerTaskKey, 0)
for _, key := range allKeys {
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key)
if err != nil {
Expand All @@ -227,8 +220,8 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
}

// GetTasks get all tasks
func (h *planReplayerTaskCollectorHandle) GetTasks() []PlanReplayerTaskKey {
tasks := make([]PlanReplayerTaskKey, 0)
func (h *planReplayerTaskCollectorHandle) GetTasks() []replayer.PlanReplayerTaskKey {
tasks := make([]replayer.PlanReplayerTaskKey, 0)
h.taskMu.RLock()
defer h.taskMu.RUnlock()
for taskKey := range h.taskMu.tasks {
Expand All @@ -237,8 +230,8 @@ func (h *planReplayerTaskCollectorHandle) GetTasks() []PlanReplayerTaskKey {
return tasks
}

func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey) {
r := make(map[PlanReplayerTaskKey]struct{})
func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []replayer.PlanReplayerTaskKey) {
r := make(map[replayer.PlanReplayerTaskKey]struct{})
for _, task := range tasks {
r[task] = struct{}{}
}
Expand All @@ -247,13 +240,13 @@ func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey
h.taskMu.tasks = r
}

func (h *planReplayerTaskCollectorHandle) removeTask(taskKey PlanReplayerTaskKey) {
func (h *planReplayerTaskCollectorHandle) removeTask(taskKey replayer.PlanReplayerTaskKey) {
h.taskMu.Lock()
defer h.taskMu.Unlock()
delete(h.taskMu.tasks, taskKey)
}

func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]replayer.PlanReplayerTaskKey, error) {
exec := h.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task")
if err != nil {
Expand All @@ -267,10 +260,10 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return nil, errors.Trace(err)
}
allKeys := make([]PlanReplayerTaskKey, 0, len(rows))
allKeys := make([]replayer.PlanReplayerTaskKey, 0, len(rows))
for _, row := range rows {
sqlDigest, planDigest := row.GetString(0), row.GetString(1)
allKeys = append(allKeys, PlanReplayerTaskKey{
allKeys = append(allKeys, replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest,
PlanDigest: planDigest,
})
Expand All @@ -282,13 +275,13 @@ type planReplayerDumpTaskStatus struct {
// running task records the task running by all workers in order to avoid multi workers running the same task key
runningTaskMu struct {
sync.RWMutex
runningTasks map[PlanReplayerTaskKey]struct{}
runningTasks map[replayer.PlanReplayerTaskKey]struct{}
}

// finished task records the finished task in order to avoid running finished task key
finishedTaskMu struct {
sync.RWMutex
finishedTask map[PlanReplayerTaskKey]struct{}
finishedTask map[replayer.PlanReplayerTaskKey]struct{}
}
}

Expand All @@ -303,7 +296,7 @@ func (r *planReplayerDumpTaskStatus) GetRunningTaskStatusLen() int {
func (r *planReplayerDumpTaskStatus) CleanFinishedTaskStatus() {
r.finishedTaskMu.Lock()
defer r.finishedTaskMu.Unlock()
r.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
r.finishedTaskMu.finishedTask = map[replayer.PlanReplayerTaskKey]struct{}{}
}

// GetFinishedTaskStatusLen used for unit test
Expand Down Expand Up @@ -346,7 +339,7 @@ func (r *planReplayerDumpTaskStatus) setTaskFinished(task *PlanReplayerDumpTask)
func (r *planReplayerDumpTaskStatus) clearFinishedTask() {
r.finishedTaskMu.Lock()
defer r.finishedTaskMu.Unlock()
r.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
r.finishedTaskMu.finishedTask = map[replayer.PlanReplayerTaskKey]struct{}{}
}

type planReplayerTaskDumpWorker struct {
Expand All @@ -373,7 +366,7 @@ func (w *planReplayerTaskDumpWorker) run() {
// HandleTask handled task
func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (success bool) {
defer func() {
if success {
if success && task.IsContinuesCapture {
w.status.setTaskFinished(task)
}
}()
Expand All @@ -391,7 +384,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
return true
}

file, fileName, err := GeneratePlanReplayerFile()
file, fileName, err := replayer.GeneratePlanReplayerFile()
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand All @@ -404,26 +397,28 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
jsStats := make(map[int64]*handle.JSONTable)
is := GetDomain(w.sctx).InfoSchema()
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
return false
}
schema, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
if task.IsCapture && !task.IsContinuesCapture {
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
return false
}
schema, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
jsStats[tblID] = r
}
jsStats[tblID] = r
task.JSONTblStats = jsStats
}
task.JSONTblStats = jsStats
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
Expand Down Expand Up @@ -461,7 +456,7 @@ func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask {
return <-h.taskCH
}

func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) {
func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task replayer.PlanReplayerTaskKey) (bool, error) {
exec := sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.SQLDigest, task.PlanDigest))
if err != nil {
Expand Down Expand Up @@ -512,15 +507,9 @@ type PlanReplayerStatusRecord struct {
FailedReason string
}

// PlanReplayerTaskKey indicates key of a plan replayer task
type PlanReplayerTaskKey struct {
SQLDigest string
PlanDigest string
}

// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
PlanReplayerTaskKey
replayer.PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
Expand All @@ -537,35 +526,9 @@ type PlanReplayerDumpTask struct {

FileName string
Zf *os.File
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
path := GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
if err != nil {
return nil, "", errors.AddStack(err)
}
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return nil, "", errors.AddStack(err)
}
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err := rand.Read(b)
if err != nil {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
// IsCapture indicates whether the task is from capture
IsCapture bool
// IsContinuesCapture indicates whether the task is from continues capture
IsContinuesCapture bool
}
Loading

0 comments on commit 3aba336

Please sign in to comment.