Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: support download plan replayer continues capture file #40085

Merged
merged 17 commits into from
Dec 22, 2022
Merged
2 changes: 1 addition & 1 deletion domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
return true
}

file, fileName, err := replayer.GeneratePlanReplayerFile()
file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand Down
18 changes: 10 additions & 8 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
if _, ok := stmtNode.(*ast.SelectStmt); ok {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error {

func (e *PlanReplayerExec) createFile() error {
var err error
e.DumpInfo.File, e.DumpInfo.FileName, err = replayer.GeneratePlanReplayerFile()
e.DumpInfo.File, e.DumpInfo.FileName, err = replayer.GeneratePlanReplayerFile(false)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//sessiontxn",
"//statistics/handle",
"//store",
"//store/driver/error",
"//store/gcworker",
Expand Down Expand Up @@ -91,6 +92,7 @@ go_library(
"//util/topsql/stmtstats",
"//util/versioninfo",
"@com_github_blacktear23_go_proxyprotocol//:go-proxyprotocol",
"@com_github_burntsushi_toml//:toml",
"@com_github_gorilla_mux//:mux",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
Expand Down
196 changes: 193 additions & 3 deletions server/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,26 @@
package server

import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/BurntSushi/toml"
"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
Expand All @@ -32,9 +43,11 @@ import (

// PlanReplayerHandler is the handler for dumping plan replayer file.
type PlanReplayerHandler struct {
infoGetter *infosync.InfoSyncer
address string
statusPort uint
is infoschema.InfoSchema
statsHandle *handle.Handle
infoGetter *infosync.InfoSyncer
address string
statusPort uint
}

func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler {
Expand All @@ -46,6 +59,12 @@ func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler {
if s.dom != nil && s.dom.InfoSyncer() != nil {
prh.infoGetter = s.dom.InfoSyncer()
}
if s.dom != nil && s.dom.InfoSchema() != nil {
prh.is = s.dom.InfoSchema()
}
if s.dom != nil && s.dom.StatsHandle() != nil {
prh.statsHandle = s.dom.StatsHandle()
}
return prh
}

Expand All @@ -61,6 +80,8 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
urlPath: fmt.Sprintf("plan_replayer/dump/%s", name),
downloadedFilename: "plan_replayer",
scheme: util.InternalHTTPSchema(),
statsHandle: prh.statsHandle,
is: prh.is,
}
handleDownloadFile(handler, w, req)
}
Expand Down Expand Up @@ -93,6 +114,13 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req
writeError(w, err)
return
}
if handler.downloadedFilename == "plan_replayer" {
content, err = handlePlanReplayerContinuesCaptureFile(content, path, handler)
if err != nil {
writeError(w, err)
return
}
}
_, err = w.Write(content)
if err != nil {
writeError(w, err)
Expand Down Expand Up @@ -175,6 +203,9 @@ type downloadFileHandler struct {
statusPort uint
urlPath string
downloadedFilename string

statsHandle *handle.Handle
is infoschema.InfoSchema
}

func isExists(path string) (bool, error) {
Expand All @@ -187,3 +218,162 @@ func isExists(path string) (bool, error) {
}
return true, nil
}

func handlePlanReplayerContinuesCaptureFile(content []byte, path string, handler downloadFileHandler) ([]byte, error) {
if !strings.Contains(handler.filePath, "continues_replayer") {
return content, nil
}
b := bytes.NewReader(content)
zr, err := zip.NewReader(b, int64(len(content)))
if err != nil {
return nil, err
}
startTS, err := loadSQLMetaFile(zr)
if err != nil {
return nil, err
}
if startTS == 0 {
return content, nil
}
tbls, err := loadSchemaMeta(zr, handler.is)
if err != nil {
return nil, err
}
for _, tbl := range tbls {
jsonStats, err := handler.statsHandle.DumpHistoricalStatsBySnapshot(tbl.dbName, tbl.info, startTS)
if err != nil {
return nil, err
}
tbl.jsonStats = jsonStats
}
newPath, err := dumpJSONStatsIntoZip(tbls, content, path)
if err != nil {
return nil, err
}
//nolint: gosec
file, err := os.Open(newPath)
if err != nil {
return nil, err
}
content, err = io.ReadAll(file)
if err != nil {
return nil, err
}
err = file.Close()
if err != nil {
return nil, err
}
return content, nil
}

func loadSQLMetaFile(z *zip.Reader) (uint64, error) {
for _, zipFile := range z.File {
if zipFile.Name == domain.PlanReplayerSQLMetaFile {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
return 0, errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
_, err = toml.DecodeReader(v, &varMap)
if err != nil {
return 0, errors.AddStack(err)
}
startTS, err := strconv.ParseUint(varMap[domain.PlanReplayerSQLMetaStartTS], 10, 64)
if err != nil {
return 0, err
}
return startTS, nil
}
}
return 0, nil
}

func loadSchemaMeta(z *zip.Reader, is infoschema.InfoSchema) (map[int64]*tblInfo, error) {
r := make(map[int64]*tblInfo, 0)
for _, zipFile := range z.File {
if zipFile.Name == fmt.Sprintf("schema/%v", domain.PlanReplayerSchemaMetaFile) {
v, err := zipFile.Open()
if err != nil {
return nil, errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(v)
if err != nil {
return nil, errors.AddStack(err)
}
rows := strings.Split(buf.String(), "\n")
for _, row := range rows {
s := strings.Split(row, ";")
databaseName := s[0]
tableName := s[1]
t, err := is.TableByName(model.NewCIStr(databaseName), model.NewCIStr(tableName))
if err != nil {
return nil, err
}
r[t.Meta().ID] = &tblInfo{
info: t.Meta(),
dbName: databaseName,
tblName: tableName,
}
}
break
}
}
return r, nil
}

func dumpJSONStatsIntoZip(tbls map[int64]*tblInfo, content []byte, path string) (string, error) {
zr, err := zip.NewReader(bytes.NewReader(content), int64(len(content)))
if err != nil {
return "", err
}
newPath := fmt.Sprintf("copy_%v.zip", path[0:len(path)-4])
zf, err := os.Create(newPath)
if err != nil {
return "", err
}
zw := zip.NewWriter(zf)
for _, f := range zr.File {
err = zw.Copy(f)
if err != nil {
logutil.BgLogger().Error("copy plan replayer zip file failed", zap.Error(err))
return "", err
}
}
for _, tbl := range tbls {
w, err := zw.Create(fmt.Sprintf("stats/%v.%v.json", tbl.dbName, tbl.tblName))
if err != nil {
return "", err
}
data, err := json.Marshal(tbl.jsonStats)
if err != nil {
return "", err
}
_, err = w.Write(data)
if err != nil {
return "", err
}
}
err = zw.Close()
if err != nil {
logutil.BgLogger().Error("Closing file failed", zap.Error(err))
return "", err
}
err = zf.Close()
if err != nil {
logutil.BgLogger().Error("Closing file failed", zap.Error(err))
return "", err
}
return newPath, nil
}

type tblInfo struct {
info *model.TableInfo
jsonStats *handle.JSONTable
dbName string
tblName string
}
1 change: 1 addition & 0 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (h *Handle) ClearOutdatedHistoryStats() error {
}
sql = "delete from mysql.stats_history where NOW() - create_time >= %? "
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
logutil.BgLogger().Info("clear outdated historical stats")
return err
}

Expand Down
9 changes: 6 additions & 3 deletions util/replayer/replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type PlanReplayerTaskKey struct {
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
func GeneratePlanReplayerFile(isContinues bool) (*os.File, string, error) {
path := GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
fileName, err := generatePlanReplayerFileName(isContinues)
if err != nil {
return nil, "", errors.AddStack(err)
}
Expand All @@ -50,7 +50,7 @@ func GeneratePlanReplayerFile() (*os.File, string, error) {
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
func generatePlanReplayerFileName(isContinues bool) (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
Expand All @@ -60,6 +60,9 @@ func generatePlanReplayerFileName() (string, error) {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
if isContinues {
return fmt.Sprintf("continues_replayer_%v_%v.zip", key, time), nil
}
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
}

Expand Down