Skip to content

Commit

Permalink
planner: refactor some code of binding cache (#58456)
Browse files Browse the repository at this point in the history
ref #51347
  • Loading branch information
qw4990 authored Dec 23, 2024
1 parent 52de47d commit 874e0a3
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 280 deletions.
4 changes: 1 addition & 3 deletions pkg/bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
"binding_match.go",
"global_handle.go",
"session_handle.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/pkg/bindinfo",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -62,7 +61,7 @@ go_test(
embed = [":bindinfo"],
flaky = True,
race = "on",
shard_count = 33,
shard_count = 34,
deps = [
"//pkg/bindinfo/internal",
"//pkg/bindinfo/norm",
Expand All @@ -78,7 +77,6 @@ go_test(
"//pkg/testkit/testsetup",
"//pkg/types",
"//pkg/util",
"//pkg/util/hack",
"//pkg/util/stmtsummary",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
15 changes: 0 additions & 15 deletions pkg/bindinfo/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,6 @@ func (br Bindings) Copy() Bindings {
return nbr
}

// HasAvailableBinding checks if there are any available bindings in bind record.
// The available means the binding can be used or can be converted into a usable status.
// It includes the 'Enabled', 'Using' and 'Disabled' status.
func HasAvailableBinding(br Bindings) bool {
if br == nil {
return false
}
for _, binding := range br {
if binding.IsBindingAvailable() {
return true
}
}
return false
}

// prepareHints builds ID and Hint for Bindings. If sctx is not nil, we check if
// the BindSQL is still valid.
func prepareHints(sctx sessionctx.Context, binding *Binding) (rerr error) {
Expand Down
24 changes: 0 additions & 24 deletions pkg/bindinfo/binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ type BindingCache interface {
GetMemUsage() int64
// GetMemCapacity gets the memory capacity of the cache.
GetMemCapacity() int64
// Copy copies the cache.
Copy() (newCache BindingCache, err error)
// Size returns the number of items in the cache.
Size() int
}
Expand Down Expand Up @@ -451,28 +449,6 @@ func (c *bindingCache) GetMemCapacity() int64 {
return c.memCapacity
}

// Copy copies a new bindingCache from the origin cache.
// The function is thread-safe.
func (c *bindingCache) Copy() (BindingCache, error) {
c.lock.Lock()
defer c.lock.Unlock()
var err error
newCache := newBindCache(c.loadBindingFromStorageFunc).(*bindingCache)
if c.memTracker.BytesConsumed() > newCache.GetMemCapacity() {
err = errors.New("The memory usage of all available bindings exceeds the cache's mem quota. As a result, all available bindings cannot be held on the cache. Please increase the value of the system variable 'tidb_mem_quota_binding_cache' and execute 'admin reload bindings' to ensure that all bindings exist in the cache and can be used normally")
}
keys := c.cache.Keys()
for _, key := range keys {
cacheKey := key.(bindingCacheKey)
v := c.get(cacheKey)
if _, err := newCache.set(cacheKey, v); err != nil {
return nil, err
}
}
newCache.digestBiMap = c.digestBiMap.Copy()
return newCache, err
}

func (c *bindingCache) Size() int {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down
67 changes: 12 additions & 55 deletions pkg/bindinfo/binding_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
package bindinfo

import (
"strconv"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/bindinfo/norm"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -69,66 +66,26 @@ func TestCrossDBBindingCache(t *testing.T) {
require.False(t, ok) // can't find b2 now
_, ok = fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b3.SQLDigest]
require.True(t, ok)

// test deep copy
newCache, err := fbc.Copy()
require.NoError(t, err)
newFBC := newCache.(*bindingCache)
newFBC.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest[fDigest1] = nil
delete(newFBC.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest, b1.SQLDigest)
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest[fDigest1]), 1) // no impact to the original cache
_, ok = fbc.digestBiMap.(*digestBiMapImpl).sqlDigest2noDBDigest[b1.SQLDigest]
require.True(t, ok)
}

func TestBindCache(t *testing.T) {
variable.MemQuotaBindingCache.Store(250)
bindCache := newBindCache(nil).(*bindingCache)

value := make([]Bindings, 3)
key := make([]bindingCacheKey, 3)
var bigKey string
for i := 0; i < 3; i++ {
cacheKey := strings.Repeat(strconv.Itoa(i), 50)
key[i] = bindingCacheKey(hack.Slice(cacheKey))
value[i] = []Binding{{OriginalSQL: cacheKey}}
bigKey += cacheKey

require.Equal(t, int64(116), calcBindCacheKVMem(key[i], value[i]))
}
bindings := Bindings{{BindSQL: "SELECT * FROM t1"}}
kvSize := len("digest1") + int(bindings.size())
variable.MemQuotaBindingCache.Store(int64(kvSize*3) - 1)
bindCache := newBindCache(nil)

ok, err := bindCache.set(key[0], value[0])
require.True(t, ok)
err := bindCache.SetBinding("digest1", bindings)
require.Nil(t, err)
result := bindCache.get(key[0])
require.NotNil(t, result)
require.NotNil(t, bindCache.GetBinding("digest1"))

ok, err = bindCache.set(key[1], value[1])
require.True(t, ok)
err = bindCache.SetBinding("digest2", bindings)
require.Nil(t, err)
result = bindCache.get(key[1])
require.NotNil(t, result)
require.NotNil(t, bindCache.GetBinding("digest2"))

ok, err = bindCache.set(key[2], value[2])
require.True(t, ok)
err = bindCache.SetBinding("digest3", bindings)
require.NotNil(t, err) // exceed the memory limit
result = bindCache.get(key[2])
require.NotNil(t, result)

// key[0] is not in the cache
result = bindCache.get(key[0])
require.Nil(t, result)

// key[1] is still in the cache
result = bindCache.get(key[1])
require.NotNil(t, result)
require.NotNil(t, bindCache.GetBinding("digest2"))

bigBindCacheKey := bindingCacheKey(hack.Slice(bigKey))
bigBindCacheValue := []Binding{{OriginalSQL: strings.Repeat("x", 100)}}
require.Equal(t, int64(266), calcBindCacheKVMem(bigBindCacheKey, bigBindCacheValue))
ok, err = bindCache.set(bigBindCacheKey, bigBindCacheValue)
require.False(t, ok) // the key-value pair is too big to be cached
require.NotNil(t, err)
result = bindCache.get(bigBindCacheKey)
require.Nil(t, result)
require.Nil(t, bindCache.GetBinding("digest1")) // digest1 is evicted
require.NotNil(t, bindCache.GetBinding("digest2")) // digest2 is still in the cache
}
54 changes: 27 additions & 27 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/bindinfo/internal/logutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner/core/resolve"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -38,6 +40,7 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/hint"
utilparser "github.com/pingcap/tidb/pkg/util/parser"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)
Expand Down Expand Up @@ -91,7 +94,7 @@ type GlobalBindingHandle interface {
type globalBindingHandle struct {
sPool util.SessionPool

crossDBBindingCache atomic.Value
bindingCache BindingCache

// lastTaskTime records the last update time for the global sql bind cache.
// This value is used to avoid reload duplicated bindings from storage.
Expand Down Expand Up @@ -131,19 +134,10 @@ func NewGlobalBindingHandle(sPool util.SessionPool) GlobalBindingHandle {
return handle
}

func (h *globalBindingHandle) getCache() BindingCache {
return h.crossDBBindingCache.Load().(BindingCache)
}

func (h *globalBindingHandle) setCache(c BindingCache) {
// TODO: update the global cache in-place instead of replacing it and remove this function.
h.crossDBBindingCache.Store(c)
}

// Reset is to reset the BindHandle and clean old info.
func (h *globalBindingHandle) Reset() {
h.lastUpdateTime.Store(types.ZeroTimestamp)
h.setCache(newBindCache(h.LoadBindingsFromStorage))
h.bindingCache = newBindCache(h.LoadBindingsFromStorage)
variable.RegisterStatistics(h)
}

Expand All @@ -159,18 +153,12 @@ func (h *globalBindingHandle) setLastUpdateTime(t types.Time) {
func (h *globalBindingHandle) LoadFromStorageToCache(fullLoad bool) (err error) {
var lastUpdateTime types.Time
var timeCondition string
var newCache BindingCache
if fullLoad {
lastUpdateTime = types.ZeroTimestamp
timeCondition = ""
newCache = newBindCache(h.LoadBindingsFromStorage)
} else {
lastUpdateTime = h.getLastUpdateTime()
timeCondition = fmt.Sprintf("WHERE update_time>'%s'", lastUpdateTime.String())
newCache, err = h.getCache().Copy()
if err != nil {
return err
}
}

selectStmt := fmt.Sprintf(`SELECT original_sql, bind_sql, default_db, status, create_time,
Expand All @@ -185,11 +173,10 @@ func (h *globalBindingHandle) LoadFromStorageToCache(fullLoad bool) (err error)

defer func() {
h.setLastUpdateTime(lastUpdateTime)
h.setCache(newCache)

metrics.BindingCacheMemUsage.Set(float64(h.GetMemUsage()))
metrics.BindingCacheMemLimit.Set(float64(h.GetMemCapacity()))
metrics.BindingCacheNumBindings.Set(float64(len(h.getCache().GetAllBindings())))
metrics.BindingCacheNumBindings.Set(float64(len(h.bindingCache.GetAllBindings())))
}()

for _, row := range rows {
Expand All @@ -210,18 +197,18 @@ func (h *globalBindingHandle) LoadFromStorageToCache(fullLoad bool) (err error)
continue
}

oldBinding := newCache.GetBinding(sqlDigest)
oldBinding := h.bindingCache.GetBinding(sqlDigest)
newBinding := removeDeletedBindings(merge(oldBinding, []Binding{binding}))
if len(newBinding) > 0 {
err = newCache.SetBinding(sqlDigest, newBinding)
err = h.bindingCache.SetBinding(sqlDigest, newBinding)
if err != nil {
// When the memory capacity of bing_cache is not enough,
// there will be some memory-related errors in multiple places.
// Only needs to be handled once.
logutil.BindLogger().Warn("BindHandle.Update", zap.Error(err))
}
} else {
newCache.RemoveBinding(sqlDigest)
h.bindingCache.RemoveBinding(sqlDigest)
}
}
return nil
Expand Down Expand Up @@ -412,28 +399,28 @@ func lockBindInfoTable(sctx sessionctx.Context) error {

// MatchGlobalBinding returns the matched binding for this statement.
func (h *globalBindingHandle) MatchGlobalBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool) {
return h.getCache().MatchingBinding(sctx, noDBDigest, tableNames)
return h.bindingCache.MatchingBinding(sctx, noDBDigest, tableNames)
}

// GetAllGlobalBindings returns all bind records in cache.
func (h *globalBindingHandle) GetAllGlobalBindings() (bindings Bindings) {
return h.getCache().GetAllBindings()
return h.bindingCache.GetAllBindings()
}

// SetBindingCacheCapacity reset the capacity for the bindingCache.
// It will not affect already cached Bindings.
func (h *globalBindingHandle) SetBindingCacheCapacity(capacity int64) {
h.getCache().SetMemCapacity(capacity)
h.bindingCache.SetMemCapacity(capacity)
}

// GetMemUsage returns the memory usage for the bind cache.
func (h *globalBindingHandle) GetMemUsage() (memUsage int64) {
return h.getCache().GetMemUsage()
return h.bindingCache.GetMemUsage()
}

// GetMemCapacity returns the memory capacity for the bind cache.
func (h *globalBindingHandle) GetMemCapacity() (memCapacity int64) {
return h.getCache().GetMemCapacity()
return h.bindingCache.GetMemCapacity()
}

// newBinding builds Bindings from a tuple in storage.
Expand Down Expand Up @@ -671,3 +658,16 @@ func (h *globalBindingHandle) loadBindingsFromStorageInternal(sqlDigest string)
})
return bindings, err
}

// exec is a helper function to execute sql and return RecordSet.
func exec(sctx sessionctx.Context, sql string, args ...any) (sqlexec.RecordSet, error) {
sqlExec := sctx.GetSQLExecutor()
return sqlExec.ExecuteInternal(kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo), sql, args...)
}

// execRows is a helper function to execute sql and return rows and fields.
func execRows(sctx sessionctx.Context, sql string, args ...any) (rows []chunk.Row, fields []*resolve.ResultField, err error) {
sqlExec := sctx.GetRestrictedSQLExecutor()
return sqlExec.ExecRestrictedSQL(kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo),
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...)
}
36 changes: 29 additions & 7 deletions pkg/bindinfo/global_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,7 @@ func TestOutdatedInfoSchema(t *testing.T) {
}

func TestReloadBindings(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
Expand All @@ -533,11 +532,7 @@ func TestReloadBindings(t *testing.T) {
require.Equal(t, 1, len(rows))
rows = tk.MustQuery("select * from mysql.bind_info where source != 'builtin'").Rows()
require.Equal(t, 1, len(rows))
tk.MustExec("delete from mysql.bind_info where source != 'builtin'")
require.Nil(t, dom.BindHandle().LoadFromStorageToCache(false))
rows = tk.MustQuery("show global bindings").Rows()
require.Equal(t, 1, len(rows))
tk.MustExec("admin reload bindings")
tk.MustExec(`drop global binding for select * from t`)
rows = tk.MustQuery("show global bindings").Rows()
require.Equal(t, 0, len(rows))
}
Expand Down Expand Up @@ -612,3 +607,30 @@ func (p *mockSessionPool) Get() (pools.Resource, error) {
func (p *mockSessionPool) Put(pools.Resource) {}

func (p *mockSessionPool) Close() {}

func TestShowBindingDigestField(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(id int, key(id))")
tk.MustExec("create table t2(id int, key(id))")
tk.MustExec("create binding for select * from t1, t2 where t1.id = t2.id using select /*+ merge_join(t1, t2)*/ * from t1, t2 where t1.id = t2.id")
result := tk.MustQuery("show bindings;")
rows := result.Rows()[0]
require.Equal(t, len(rows), 11)
require.Equal(t, rows[9], "ac1ceb4eb5c01f7c03e29b7d0d6ab567e563f4c93164184cde218f20d07fd77c")
tk.MustExec("drop binding for select * from t1, t2 where t1.id = t2.id")
result = tk.MustQuery("show bindings;")
require.Equal(t, len(result.Rows()), 0)

tk.MustExec("create global binding for select * from t1, t2 where t1.id = t2.id using select /*+ merge_join(t1, t2)*/ * from t1, t2 where t1.id = t2.id")
result = tk.MustQuery("show global bindings;")
rows = result.Rows()[0]
require.Equal(t, len(rows), 11)
require.Equal(t, rows[9], "ac1ceb4eb5c01f7c03e29b7d0d6ab567e563f4c93164184cde218f20d07fd77c")
tk.MustExec("drop global binding for select * from t1, t2 where t1.id = t2.id")
result = tk.MustQuery("show global bindings;")
require.Equal(t, len(result.Rows()), 0)
}
Loading

0 comments on commit 874e0a3

Please sign in to comment.