diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index 4df05a627f3e2..c0f3231223c74 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -384,7 +384,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc return true } - file, fileName, err := replayer.GeneratePlanReplayerFile() + file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture) if err != nil { logutil.BgLogger().Warn("generate plan replayer capture task file failed", zap.String("sqlDigest", taskKey.SQLDigest), diff --git a/executor/compiler.go b/executor/compiler.go index cfe71f2ccdf29..e2c2a29794d1d 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -158,14 +158,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL { - startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() - if err != nil { - return nil, err - } - if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture { - checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS) - } else { - checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS) + if _, ok := stmtNode.(*ast.SelectStmt); ok { + startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() + if err != nil { + return nil, err + } + if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture { + checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS) + } else { + checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS) + } } } diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index 60a47991b48df..fae6273b3bd5e 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -125,7 +125,7 @@ func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error { func (e *PlanReplayerExec) createFile() error { var err error - e.DumpInfo.File, e.DumpInfo.FileName, err = replayer.GeneratePlanReplayerFile() + e.DumpInfo.File, e.DumpInfo.FileName, err = replayer.GeneratePlanReplayerFile(false) if err != nil { return err } diff --git a/server/BUILD.bazel b/server/BUILD.bazel index 191ae1c1c6f7d..afcb983fad76e 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -58,6 +58,7 @@ go_library( "//sessionctx/stmtctx", "//sessionctx/variable", "//sessiontxn", + "//statistics/handle", "//store", "//store/driver/error", "//store/gcworker", @@ -91,6 +92,7 @@ go_library( "//util/topsql/stmtstats", "//util/versioninfo", "@com_github_blacktear23_go_proxyprotocol//:go-proxyprotocol", + "@com_github_burntsushi_toml//:toml", "@com_github_gorilla_mux//:mux", "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", diff --git a/server/plan_replayer.go b/server/plan_replayer.go index 13160d3b6202f..beb202638d1fd 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -15,15 +15,26 @@ package server import ( + "archive/zip" + "bytes" + "encoding/json" "fmt" "io" "net/http" "os" "path/filepath" + "strconv" + "strings" + "github.com/BurntSushi/toml" "github.com/gorilla/mux" + "github.com/pingcap/errors" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/replayer" @@ -32,9 +43,11 @@ import ( // PlanReplayerHandler is the handler for dumping plan replayer file. type PlanReplayerHandler struct { - infoGetter *infosync.InfoSyncer - address string - statusPort uint + is infoschema.InfoSchema + statsHandle *handle.Handle + infoGetter *infosync.InfoSyncer + address string + statusPort uint } func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler { @@ -46,6 +59,12 @@ func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler { if s.dom != nil && s.dom.InfoSyncer() != nil { prh.infoGetter = s.dom.InfoSyncer() } + if s.dom != nil && s.dom.InfoSchema() != nil { + prh.is = s.dom.InfoSchema() + } + if s.dom != nil && s.dom.StatsHandle() != nil { + prh.statsHandle = s.dom.StatsHandle() + } return prh } @@ -61,6 +80,8 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques urlPath: fmt.Sprintf("plan_replayer/dump/%s", name), downloadedFilename: "plan_replayer", scheme: util.InternalHTTPSchema(), + statsHandle: prh.statsHandle, + is: prh.is, } handleDownloadFile(handler, w, req) } @@ -93,6 +114,13 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req writeError(w, err) return } + if handler.downloadedFilename == "plan_replayer" { + content, err = handlePlanReplayerContinuesCaptureFile(content, path, handler) + if err != nil { + writeError(w, err) + return + } + } _, err = w.Write(content) if err != nil { writeError(w, err) @@ -175,6 +203,9 @@ type downloadFileHandler struct { statusPort uint urlPath string downloadedFilename string + + statsHandle *handle.Handle + is infoschema.InfoSchema } func isExists(path string) (bool, error) { @@ -187,3 +218,162 @@ func isExists(path string) (bool, error) { } return true, nil } + +func handlePlanReplayerContinuesCaptureFile(content []byte, path string, handler downloadFileHandler) ([]byte, error) { + if !strings.Contains(handler.filePath, "continues_replayer") { + return content, nil + } + b := bytes.NewReader(content) + zr, err := zip.NewReader(b, int64(len(content))) + if err != nil { + return nil, err + } + startTS, err := loadSQLMetaFile(zr) + if err != nil { + return nil, err + } + if startTS == 0 { + return content, nil + } + tbls, err := loadSchemaMeta(zr, handler.is) + if err != nil { + return nil, err + } + for _, tbl := range tbls { + jsonStats, err := handler.statsHandle.DumpHistoricalStatsBySnapshot(tbl.dbName, tbl.info, startTS) + if err != nil { + return nil, err + } + tbl.jsonStats = jsonStats + } + newPath, err := dumpJSONStatsIntoZip(tbls, content, path) + if err != nil { + return nil, err + } + //nolint: gosec + file, err := os.Open(newPath) + if err != nil { + return nil, err + } + content, err = io.ReadAll(file) + if err != nil { + return nil, err + } + err = file.Close() + if err != nil { + return nil, err + } + return content, nil +} + +func loadSQLMetaFile(z *zip.Reader) (uint64, error) { + for _, zipFile := range z.File { + if zipFile.Name == domain.PlanReplayerSQLMetaFile { + varMap := make(map[string]string) + v, err := zipFile.Open() + if err != nil { + return 0, errors.AddStack(err) + } + //nolint: errcheck,all_revive + defer v.Close() + _, err = toml.DecodeReader(v, &varMap) + if err != nil { + return 0, errors.AddStack(err) + } + startTS, err := strconv.ParseUint(varMap[domain.PlanReplayerSQLMetaStartTS], 10, 64) + if err != nil { + return 0, err + } + return startTS, nil + } + } + return 0, nil +} + +func loadSchemaMeta(z *zip.Reader, is infoschema.InfoSchema) (map[int64]*tblInfo, error) { + r := make(map[int64]*tblInfo, 0) + for _, zipFile := range z.File { + if zipFile.Name == fmt.Sprintf("schema/%v", domain.PlanReplayerSchemaMetaFile) { + v, err := zipFile.Open() + if err != nil { + return nil, errors.AddStack(err) + } + //nolint: errcheck,all_revive + defer v.Close() + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(v) + if err != nil { + return nil, errors.AddStack(err) + } + rows := strings.Split(buf.String(), "\n") + for _, row := range rows { + s := strings.Split(row, ";") + databaseName := s[0] + tableName := s[1] + t, err := is.TableByName(model.NewCIStr(databaseName), model.NewCIStr(tableName)) + if err != nil { + return nil, err + } + r[t.Meta().ID] = &tblInfo{ + info: t.Meta(), + dbName: databaseName, + tblName: tableName, + } + } + break + } + } + return r, nil +} + +func dumpJSONStatsIntoZip(tbls map[int64]*tblInfo, content []byte, path string) (string, error) { + zr, err := zip.NewReader(bytes.NewReader(content), int64(len(content))) + if err != nil { + return "", err + } + newPath := fmt.Sprintf("copy_%v.zip", path[0:len(path)-4]) + zf, err := os.Create(newPath) + if err != nil { + return "", err + } + zw := zip.NewWriter(zf) + for _, f := range zr.File { + err = zw.Copy(f) + if err != nil { + logutil.BgLogger().Error("copy plan replayer zip file failed", zap.Error(err)) + return "", err + } + } + for _, tbl := range tbls { + w, err := zw.Create(fmt.Sprintf("stats/%v.%v.json", tbl.dbName, tbl.tblName)) + if err != nil { + return "", err + } + data, err := json.Marshal(tbl.jsonStats) + if err != nil { + return "", err + } + _, err = w.Write(data) + if err != nil { + return "", err + } + } + err = zw.Close() + if err != nil { + logutil.BgLogger().Error("Closing file failed", zap.Error(err)) + return "", err + } + err = zf.Close() + if err != nil { + logutil.BgLogger().Error("Closing file failed", zap.Error(err)) + return "", err + } + return newPath, nil +} + +type tblInfo struct { + info *model.TableInfo + jsonStats *handle.JSONTable + dbName string + tblName string +} diff --git a/statistics/handle/gc.go b/statistics/handle/gc.go index ab2159bf22bf0..46a2a9fc691e3 100644 --- a/statistics/handle/gc.go +++ b/statistics/handle/gc.go @@ -160,6 +160,7 @@ func (h *Handle) ClearOutdatedHistoryStats() error { } sql = "delete from mysql.stats_history where NOW() - create_time >= %? " _, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + logutil.BgLogger().Info("clear outdated historical stats") return err } diff --git a/util/replayer/replayer.go b/util/replayer/replayer.go index 9fb0bc4f629fa..f89d26ec97717 100644 --- a/util/replayer/replayer.go +++ b/util/replayer/replayer.go @@ -33,13 +33,13 @@ type PlanReplayerTaskKey struct { } // GeneratePlanReplayerFile generates plan replayer file -func GeneratePlanReplayerFile() (*os.File, string, error) { +func GeneratePlanReplayerFile(isContinues bool) (*os.File, string, error) { path := GetPlanReplayerDirName() err := os.MkdirAll(path, os.ModePerm) if err != nil { return nil, "", errors.AddStack(err) } - fileName, err := generatePlanReplayerFileName() + fileName, err := generatePlanReplayerFileName(isContinues) if err != nil { return nil, "", errors.AddStack(err) } @@ -50,7 +50,7 @@ func GeneratePlanReplayerFile() (*os.File, string, error) { return zf, fileName, err } -func generatePlanReplayerFileName() (string, error) { +func generatePlanReplayerFileName(isContinues bool) (string, error) { // Generate key and create zip file time := time.Now().UnixNano() b := make([]byte, 16) @@ -60,6 +60,9 @@ func generatePlanReplayerFileName() (string, error) { return "", err } key := base64.URLEncoding.EncodeToString(b) + if isContinues { + return fmt.Sprintf("continues_replayer_%v_%v.zip", key, time), nil + } return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil }