Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support download history stats from stats_history #39701

Merged
merged 13 commits into from
Dec 9, 2022
27 changes: 11 additions & 16 deletions server/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"fmt"
"net/http"
"strconv"
"time"
Expand All @@ -24,8 +25,8 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/gcutil"
"github.com/tikv/client-go/v2/oracle"
)

Expand Down Expand Up @@ -105,14 +106,14 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
return
}
defer se.Close()

dumpPartitionStats := true
if len(params[pDumpPartitionStats]) > 0 {
dumpPartitionStats, err = strconv.ParseBool(params[pDumpPartitionStats])
if err != nil {
writeError(w, err)
return
}
enabeld, err := sh.do.StatsHandle().CheckHistoricalStatsEnable()
if err != nil {
writeError(w, err)
return
}
if !enabeld {
writeError(w, fmt.Errorf("%v should be enabled", variable.TiDBEnableHistoricalStats))
return
}

se.GetSessionVars().StmtCtx.TimeZone = time.Local
Expand All @@ -127,12 +128,6 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
return
}
snapshot := oracle.GoTimeToTS(t1)
err = gcutil.ValidateSnapshot(se, snapshot)
if err != nil {
writeError(w, err)
return
}

is, err := sh.do.GetSnapshotInfoSchema(snapshot)
if err != nil {
writeError(w, err)
Expand All @@ -144,7 +139,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
writeError(w, err)
return
}
js, err := h.DumpStatsToJSONBySnapshot(params[pDBName], tbl.Meta(), snapshot, dumpPartitionStats)
js, err := h.DumpHistoricalStatsBySnapshot(params[pDBName], tbl.Meta(), snapshot)
if err != nil {
writeError(w, err)
} else {
Expand Down
6 changes: 6 additions & 0 deletions server/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/gorilla/mux"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -59,6 +60,10 @@ func TestDumpStatsAPI(t *testing.T) {
statsHandler := &StatsHandler{dom}

prepareData(t, client, statsHandler)
tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("tidb"), model.NewCIStr("test"))
require.NoError(t, err)
err = dom.GetHistoricalStatsWorker().DumpHistoricalStats(tableInfo.Meta().ID, dom.StatsHandle())
require.NoError(t, err)

router := mux.NewRouter()
router.Handle("/stats/dump/{db}/{table}", statsHandler)
Expand Down Expand Up @@ -168,6 +173,7 @@ func prepareData(t *testing.T, client *testServerClient, statHandle *StatsHandle
tk.MustExec("insert test values (1, 's')")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table test")
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("insert into test(a,b) values (1, 'v'),(3, 'vvv'),(5, 'vv')")
is := statHandle.do.InfoSchema()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(physicalID, statsVer)
h.recordHistoricalStatsMeta(physicalID, statsVer)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -263,7 +263,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(physicalID, statsVer)
h.recordHistoricalStatsMeta(physicalID, statsVer)
}
}()
h.mu.Lock()
Expand Down
93 changes: 93 additions & 0 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"time"

Expand Down Expand Up @@ -130,6 +131,42 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo,
return h.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot, dumpPartitionStats)
}

// DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history
func (h *Handle) DumpHistoricalStatsBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) {
pi := tableInfo.GetPartitionInfo()
if pi == nil {
return h.tableHistoricalStatsToJSON(tableInfo.ID, snapshot)
}
jsonTbl := &JSONTable{
DatabaseName: dbName,
TableName: tableInfo.Name.L,
Partitions: make(map[string]*JSONTable, len(pi.Definitions)),
}
for _, def := range pi.Definitions {
tbl, err := h.tableHistoricalStatsToJSON(def.ID, snapshot)
if err != nil {
return nil, errors.Trace(err)
}
if tbl == nil {
continue
}
jsonTbl.Partitions[def.Name.L] = tbl
}
h.mu.Lock()
isDynamicMode := variable.PartitionPruneMode(h.mu.ctx.GetSessionVars().PartitionPruneMode.Load()) == variable.Dynamic
h.mu.Unlock()
if isDynamicMode {
tbl, err := h.tableHistoricalStatsToJSON(tableInfo.ID, snapshot)
if err != nil {
return nil, errors.Trace(err)
}
if tbl != nil {
jsonTbl.Partitions["global"] = tbl
}
}
return jsonTbl, nil
}

// DumpStatsToJSONBySnapshot dumps statistic to json.
func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*JSONTable, error) {
h.mu.Lock()
Expand Down Expand Up @@ -194,6 +231,62 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati
return jsonTbl, nil
}

func (h *Handle) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (*JSONTable, error) {
reader, err := h.getGlobalStatsReader(0)
if err != nil {
return nil, err
}
defer func() {
err1 := h.releaseGlobalStatsReader(reader)
if err == nil && err1 != nil {
err = err1
}
}()

// get meta version
rows, _, err := reader.read("select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
if err != nil {
return nil, errors.AddStack(err)
}
if len(rows) < 1 {
return nil, fmt.Errorf("failed to get records of stats_meta_history for table_id = %v, snapshot = %v", physicalID, snapshot)
}
statsMetaVersion := rows[0].GetInt64(0)
// get stats meta
rows, _, err = reader.read("select modify_count, count from mysql.stats_meta_history where table_id = %? and version = %?", physicalID, statsMetaVersion)
if err != nil {
return nil, errors.AddStack(err)
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)

// get stats version
rows, _, err = reader.read("select distinct version from mysql.stats_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
if err != nil {
return nil, errors.AddStack(err)
}
if len(rows) < 1 {
return nil, fmt.Errorf("failed to get record of stats_history for table_id = %v, snapshot = %v", physicalID, snapshot)
}
statsVersion := rows[0].GetInt64(0)

// get stats
rows, _, err = reader.read("select stats_data from mysql.stats_history where table_id = %? and version = %? order by seq_no", physicalID, statsVersion)
if err != nil {
return nil, errors.AddStack(err)
}
blocks := make([][]byte, 0)
for _, row := range rows {
blocks = append(blocks, row.GetBytes(0))
}
jsonTbl, err := BlocksToJSONTable(blocks)
if err != nil {
return nil, errors.AddStack(err)
}
jsonTbl.Count = count
jsonTbl.ModifyCount = modifyCount
return jsonTbl, nil
}

func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*JSONTable, error) {
tbl, err := h.TableStatsFromStorage(tableInfo, physicalID, true, snapshot)
if err != nil || tbl == nil {
Expand Down
29 changes: 20 additions & 9 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, ana
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
Expand All @@ -1634,7 +1634,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = recordHistoricalStatsMeta(sctx, tableID, statsVer)
if err1 := recordHistoricalStatsMeta(sctx, tableID, statsVer); err1 != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", statsVer),
zap.Error(err1))
}
}
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
Expand Down Expand Up @@ -1808,7 +1813,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isI
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -1887,7 +1892,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -2093,7 +2098,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
slices.Sort(colIDs)
Expand Down Expand Up @@ -2164,7 +2169,7 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
Expand Down Expand Up @@ -2377,7 +2382,7 @@ func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
if extStats == nil || len(extStats.Stats) == 0 {
Expand Down Expand Up @@ -2676,10 +2681,16 @@ func recordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u
return nil
}

func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error {
func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) {
h.mu.Lock()
defer h.mu.Unlock()
return recordHistoricalStatsMeta(h.mu.ctx, tableID, version)
err := recordHistoricalStatsMeta(h.mu.ctx, tableID, version)
if err != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", version),
zap.Error(err))
}
}

// InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job.
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(id, statsVer)
h.recordHistoricalStatsMeta(id, statsVer)
}
}()
if delta.Count == 0 {
Expand Down