Skip to content

Commit

Permalink
*: GC old infoschema version in the btree nodes (#58390)
Browse files Browse the repository at this point in the history
ref #58321
  • Loading branch information
tiancaiamao authored Dec 24, 2024
1 parent e403d8f commit 6806af4
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/infoschema/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/session/txninfo",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/store/helper",
"//pkg/table",
"//pkg/table/tables",
"//pkg/types",
Expand Down
38 changes: 38 additions & 0 deletions pkg/infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package infoschema
import (
"sort"
"sync"
"time"

infoschema_metrics "github.com/pingcap/tidb/pkg/infoschema/metrics"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -41,6 +44,9 @@ type InfoCache struct {
// first known schema version records the first known schema version, all schemas between [firstKnownSchemaVersion, latest)
// are known as long as we keep the DDL history correctly.
firstKnownSchemaVersion int64

lastCheckVersion int64
lastCheckTime time.Time
}

type schemaAndTimestamp struct {
Expand Down Expand Up @@ -290,6 +296,8 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema {
return nil
}

const gcCheckInterval = 128

// Insert will **TRY** to insert the infoschema into the cache.
// It only promised to cache the newest infoschema.
// It returns 'true' if it is cached, 'false' otherwise.
Expand All @@ -300,6 +308,14 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
defer h.mu.Unlock()

version := is.SchemaMetaVersion()
if h.lastCheckVersion == 0 {
h.lastCheckVersion = version
h.lastCheckTime = time.Now()
} else if version > h.lastCheckVersion+gcCheckInterval && time.Since(h.lastCheckTime) > time.Minute {
h.lastCheckVersion = version
h.lastCheckTime = time.Now()
go h.gcOldVersion()
}

// assume this is the timestamp order as well
i := sort.Search(len(h.cache), func(i int) bool {
Expand Down Expand Up @@ -372,3 +388,25 @@ func (h *InfoCache) InsertEmptySchemaVersion(version int64) {
}
}
}

func (h *InfoCache) gcOldVersion() {
tikvStore, ok := h.r.Store().(helper.Storage)
if !ok {
return
}

newHelper := helper.NewHelper(tikvStore)
version, err := meta.GetOldestSchemaVersion(newHelper)
if err != nil {
logutil.BgLogger().Warn("failed to GC old schema version", zap.Error(err))
return
}
start := time.Now()
deleted, total := h.Data.GCOldVersion(version)
logutil.BgLogger().Info("GC compact old schema version",
zap.Int64("current version", h.lastCheckVersion),
zap.Int64("oldest version", version),
zap.Int("deleted", deleted),
zap.Int64("total", total),
zap.Duration("takes", time.Since(start)))
}
50 changes: 50 additions & 0 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,56 @@ func (isd *Data) deleteDB(dbInfo *model.DBInfo, schemaVersion int64) {
btreeSet(&isd.schemaID2Name, schemaIDName{schemaVersion: schemaVersion, id: dbInfo.ID, name: dbInfo.Name, tomb: true})
}

// GCOldVersion compacts btree nodes by removing items older than schema version.
// exported for testing
func (isd *Data) GCOldVersion(schemaVersion int64) (int, int64) {
maxv, ok := isd.byName.Load().Max()
if !ok {
return 0, 0
}

var total int64
var deletes []*tableItem
var prev *tableItem
// Example:
// gcOldVersion to v4
// db3 tbl1 v5
// db3 tbl1 v4
// db3 tbl1 v3 <- delete, because v3 < v4
// db2 tbl2 v1 <- keep, need to keep the latest version if all versions are less than v4
// db2 tbl2 v0 <- delete, because v0 < v4
// db1 tbl3 v4
// ...
// So the rule can be simplify to "remove all items whose (version < schemaVersion && previous item is same table)"
isd.byName.Load().DescendLessOrEqual(maxv, func(item *tableItem) bool {
total++
if item.schemaVersion < schemaVersion {
if prev != nil && prev.dbName == item.dbName && prev.tableName == item.tableName {
// find one!
deletes = append(deletes, item)
// Don't do too much work in one batch!
if len(deletes) > 1024 {
return false
}
}
}
prev = item
return true
})

byNameOld := isd.byName.Load()
byNameNew := byNameOld.Clone()
byIDOld := isd.byID.Load()
byIDNew := byIDOld.Clone()
for _, item := range deletes {
byNameNew.Delete(item)
byIDNew.Delete(item)
}
isd.byName.CompareAndSwap(byNameOld, byNameNew)
isd.byID.CompareAndSwap(byIDOld, byIDNew)
return len(deletes), total
}

// resetBeforeFullLoad is called before a full recreate operation within builder.InitWithDBInfos().
// TODO: write a generics version to avoid repeated code.
func (isd *Data) resetBeforeFullLoad(schemaVersion int64) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/resourcegroup",
"//pkg/store/helper",
"//pkg/structure",
"//pkg/util/codec",
"//pkg/util/dbterror",
"//pkg/util/hack",
"//pkg/util/partialjson",
Expand Down
24 changes: 24 additions & 0 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/resourcegroup"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/structure"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/partialjson"
Expand Down Expand Up @@ -1789,3 +1791,25 @@ func (m *Mutator) SetRUStats(stats *RUStats) error {
err = m.txn.Set(mRequestUnitStats, data)
return errors.Trace(err)
}

// GetOldestSchemaVersion gets the oldest schema version at the GC safe point.
// It works by checking the MVCC information (internal txn API) of the schema version meta key.
// This function is only used by infoschema v2 currently.
func GetOldestSchemaVersion(h *helper.Helper) (int64, error) {
ek := make([]byte, 0, len(mMetaPrefix)+len(mSchemaVersionKey)+24)
ek = append(ek, mMetaPrefix...)
ek = codec.EncodeBytes(ek, mSchemaVersionKey)
key := codec.EncodeUint(ek, uint64(structure.StringData))
mvccResp, err := h.GetMvccByEncodedKeyWithTS(key, math.MaxUint64)
if err != nil {
return 0, err
}
if mvccResp == nil || mvccResp.Info == nil || len(mvccResp.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version key")
}

v := mvccResp.Info.Writes[len(mvccResp.Info.Writes)-1]
var n int64
n, err = strconv.ParseInt(string(v.ShortValue), 10, 64)
return n, errors.Trace(err)
}
5 changes: 5 additions & 0 deletions tests/realtikvtest/sessiontest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_test(
name = "sessiontest_test",
timeout = "moderate",
srcs = [
"infoschema_v2_test.go",
"main_test.go",
"paging_test.go",
"session_fail_test.go",
Expand All @@ -12,8 +13,12 @@ go_test(
race = "on",
deps = [
"//pkg/config",
"//pkg/infoschema",
"//pkg/meta",
"//pkg/parser/model",
"//pkg/session",
"//pkg/sessionctx/variable",
"//pkg/store/helper",
"//pkg/testkit",
"//pkg/util/sqlkiller",
"//tests/realtikvtest",
Expand Down
110 changes: 110 additions & 0 deletions tests/realtikvtest/sessiontest/infoschema_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sessiontest

import (
"context"
"testing"

"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
)

func TestGCOldVersion(t *testing.T) {
defer config.RestoreFunc()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.CoprCache.CapacityMB = 0
})
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_schema_cache_size = 512 * 1024 * 1024")
tk.MustExec("use test;")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t1 (id int key, b int)")
tk.MustExec("create table t2 (id int key, b int)")
tk.MustExec("create table t3 (id int key, b int)")

dom, err := session.GetDomain(store)
require.NoError(t, err)
oldIS := dom.InfoSchema()

t1, err := oldIS.TableInfoByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
t2, err := oldIS.TableInfoByName(model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)
t3, err := oldIS.TableInfoByName(model.NewCIStr("test"), model.NewCIStr("t3"))
require.NoError(t, err)

s := store.(helper.Storage)
h := helper.NewHelper(s)
old, err := meta.GetOldestSchemaVersion(h)
require.NoError(t, err)

for i := 0; i < 10; i++ {
tk.MustExec("alter table t1 add index i_b(b)")
tk.MustExec("alter table t1 drop index i_b")
}

for i := 0; i < 10; i++ {
tk.MustExec("alter table t2 add column (c int)")
tk.MustExec("alter table t2 drop column c")
}

for i := 0; i < 10; i++ {
tk.MustExec("truncate table t3")
}

nowIS := dom.InfoSchema()
curr := nowIS.SchemaMetaVersion()
require.True(t, curr > old)

ok, v2 := infoschema.IsV2(oldIS)
require.True(t, ok)

// After GC, the related table item are deleted.
deleted, _ := v2.GCOldVersion(curr - 5)
require.True(t, deleted > 0)

// So TableByID using old ID with the old schema version would fail.
_, ok = oldIS.TableByID(context.Background(), t1.ID)
require.False(t, ok)
_, ok = oldIS.TableByID(context.Background(), t2.ID)
require.False(t, ok)
_, ok = oldIS.TableByID(context.Background(), t3.ID)
require.False(t, ok)
_, err = oldIS.TableInfoByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.Error(t, err)
_, err = oldIS.TableInfoByName(model.NewCIStr("test"), model.NewCIStr("t2"))
require.Error(t, err)
_, err = oldIS.TableInfoByName(model.NewCIStr("test"), model.NewCIStr("t3"))
require.Error(t, err)

// GC will not delete the current schema version.
// add index and add column does not change table id
_, ok = nowIS.TableByID(context.Background(), t1.ID)
require.True(t, ok)
_, ok = nowIS.TableByID(context.Background(), t2.ID)
require.True(t, ok)
_, ok = nowIS.TableByID(context.Background(), t3.ID)
// truncate table changes table id
require.False(t, ok)
}

0 comments on commit 6806af4

Please sign in to comment.