Skip to content

Commit

Permalink
planner: refactor some code of binding cache (#58504)
Browse files Browse the repository at this point in the history
ref #51347
  • Loading branch information
qw4990 authored Dec 24, 2024
1 parent 8ecdb54 commit 457dcc6
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 176 deletions.
28 changes: 5 additions & 23 deletions pkg/bindinfo/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,6 @@ func (b *Binding) SinceUpdateTime() (time.Duration, error) {
return time.Since(updateTime), nil
}

// Bindings represents a sql bind record retrieved from the storage.
type Bindings []Binding

// Copy get the copy of bindings
func (br Bindings) Copy() Bindings {
nbr := append(make(Bindings, 0, len(br)), br...)
return nbr
}

// prepareHints builds ID and Hint for Bindings. If sctx is not nil, we check if
// the BindSQL is still valid.
func prepareHints(sctx sessionctx.Context, binding *Binding) (rerr error) {
Expand Down Expand Up @@ -165,19 +156,19 @@ func prepareHints(sctx sessionctx.Context, binding *Binding) (rerr error) {
}

// `merge` merges two Bindings. It will replace old bindings with new bindings if there are new updates.
func merge(lBindings, rBindings Bindings) Bindings {
func merge(lBindings, rBindings []*Binding) []*Binding {
if lBindings == nil {
return rBindings
}
if rBindings == nil {
return lBindings
}
result := lBindings.Copy()
result := lBindings
for i := range rBindings {
rbind := rBindings[i]
found := false
for j, lbind := range lBindings {
if lbind.isSame(&rbind) {
if lbind.isSame(rbind) {
found = true
if rbind.UpdateTime.Compare(lbind.UpdateTime) >= 0 {
result[j] = rbind
Expand All @@ -192,8 +183,8 @@ func merge(lBindings, rBindings Bindings) Bindings {
return result
}

func removeDeletedBindings(br Bindings) Bindings {
result := make(Bindings, 0, len(br))
func removeDeletedBindings(br []*Binding) []*Binding {
result := make([]*Binding, 0, len(br))
for _, binding := range br {
if binding.Status != deleted {
result = append(result, binding)
Expand All @@ -202,15 +193,6 @@ func removeDeletedBindings(br Bindings) Bindings {
return result
}

// size calculates the memory size of a Bindings.
func (br Bindings) size() float64 {
mem := float64(0)
for _, binding := range br {
mem += binding.size()
}
return mem
}

// size calculates the memory size of a bind info.
func (b *Binding) size() float64 {
res := len(b.OriginalSQL) + len(b.Db) + len(b.BindSQL) + len(b.Status) + 2*int(unsafe.Sizeof(b.CreateTime)) + len(b.Charset) + len(b.Collation) + len(b.ID)
Expand Down
32 changes: 18 additions & 14 deletions pkg/bindinfo/binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ func (b *digestBiMapImpl) SQLDigest2NoDBDigest(sqlDigest string) string {
// BindingCache is the interface for the cache of the SQL plan bindings.
type BindingCache interface {
// MatchingBinding supports cross-db matching on bindings.
MatchingBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (bindings Binding, isMatched bool)
MatchingBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (binding *Binding, isMatched bool)
// GetBinding gets the binding for the specified sqlDigest.
GetBinding(sqlDigest string) Bindings
GetBinding(sqlDigest string) []*Binding
// GetAllBindings gets all the bindings in the cache.
GetAllBindings() Bindings
GetAllBindings() []*Binding
// SetBinding sets the binding for the specified sqlDigest.
SetBinding(sqlDigest string, meta Bindings) (err error)
SetBinding(sqlDigest string, bindings []*Binding) (err error)
// RemoveBinding removes the binding for the specified sqlDigest.
RemoveBinding(sqlDigest string)
// SetMemCapacity sets the memory capacity for the cache.
Expand All @@ -170,16 +170,20 @@ type bindingCache struct {
cache *ristretto.Cache // the underlying cache to store the bindings.

// loadBindingFromStorageFunc is used to load binding from storage if cache miss.
loadBindingFromStorageFunc func(sctx sessionctx.Context, sqlDigest string) (Bindings, error)
loadBindingFromStorageFunc func(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error)
}

func newBindCache(bindingLoad func(sctx sessionctx.Context, sqlDigest string) (Bindings, error)) BindingCache {
func newBindCache(bindingLoad func(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error)) BindingCache {
cache, _ := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e6,
MaxCost: variable.MemQuotaBindingCache.Load(),
BufferItems: 64,
Cost: func(value any) int64 {
return int64(value.(Bindings).size())
var cost int64
for _, binding := range value.([]*Binding) {
cost += int64(binding.size())
}
return cost
},
Metrics: true,
IgnoreInternalCost: true,
Expand All @@ -196,7 +200,7 @@ func (c *bindingCache) shouldMetric() bool {
return c.loadBindingFromStorageFunc != nil // only metric for GlobalBindingCache, whose loadBindingFromStorageFunc is not nil.
}

func (c *bindingCache) MatchingBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool) {
func (c *bindingCache) MatchingBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool) {
matchedBinding, isMatched, missingSQLDigest := c.getFromMemory(sctx, noDBDigest, tableNames)
if len(missingSQLDigest) == 0 {
if c.shouldMetric() && isMatched {
Expand All @@ -215,7 +219,7 @@ func (c *bindingCache) MatchingBinding(sctx sessionctx.Context, noDBDigest strin
return
}

func (c *bindingCache) getFromMemory(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool, missingSQLDigest []string) {
func (c *bindingCache) getFromMemory(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool, missingSQLDigest []string) {
if c.Size() == 0 {
return
}
Expand Down Expand Up @@ -290,20 +294,20 @@ func (c *bindingCache) loadFromStore(sctx sessionctx.Context, missingSQLDigest [
// GetBinding gets the Bindings from the cache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
func (c *bindingCache) GetBinding(sqlDigest string) Bindings {
func (c *bindingCache) GetBinding(sqlDigest string) []*Binding {
v, ok := c.cache.Get(sqlDigest)
if !ok {
return nil
}
return v.(Bindings)
return v.([]*Binding)
}

// GetAllBindings return all the bindings from the bindingCache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
func (c *bindingCache) GetAllBindings() Bindings {
func (c *bindingCache) GetAllBindings() []*Binding {
sqlDigests := c.digestBiMap.All()
bindings := make(Bindings, 0, len(sqlDigests))
bindings := make([]*Binding, 0, len(sqlDigests))
for _, sqlDigest := range sqlDigests {
bindings = append(bindings, c.GetBinding(sqlDigest)...)
}
Expand All @@ -312,7 +316,7 @@ func (c *bindingCache) GetAllBindings() Bindings {

// SetBinding sets the Bindings to the cache.
// The function is thread-safe.
func (c *bindingCache) SetBinding(sqlDigest string, bindings Bindings) (err error) {
func (c *bindingCache) SetBinding(sqlDigest string, bindings []*Binding) (err error) {
// prepare noDBDigests for all bindings
noDBDigests := make([]string, 0, len(bindings))
p := parser.New()
Expand Down
18 changes: 9 additions & 9 deletions pkg/bindinfo/binding_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/require"
)

func bindingNoDBDigest(t *testing.T, b Binding) string {
func bindingNoDBDigest(t *testing.T, b *Binding) string {
p := parser.New()
stmt, err := p.ParseOneStmt(b.BindSQL, b.Charset, b.Collation)
require.NoError(t, err)
Expand All @@ -34,16 +34,16 @@ func bindingNoDBDigest(t *testing.T, b Binding) string {

func TestCrossDBBindingCache(t *testing.T) {
fbc := newBindCache(nil).(*bindingCache)
b1 := Binding{BindSQL: "SELECT * FROM db1.t1", SQLDigest: "b1"}
b1 := &Binding{BindSQL: "SELECT * FROM db1.t1", SQLDigest: "b1"}
fDigest1 := bindingNoDBDigest(t, b1)
b2 := Binding{BindSQL: "SELECT * FROM db2.t1", SQLDigest: "b2"}
b3 := Binding{BindSQL: "SELECT * FROM db2.t3", SQLDigest: "b3"}
b2 := &Binding{BindSQL: "SELECT * FROM db2.t1", SQLDigest: "b2"}
b3 := &Binding{BindSQL: "SELECT * FROM db2.t3", SQLDigest: "b3"}
fDigest3 := bindingNoDBDigest(t, b3)

// add 3 bindings and b1 and b2 have the same noDBDigest
require.NoError(t, fbc.SetBinding(b1.SQLDigest, []Binding{b1}))
require.NoError(t, fbc.SetBinding(b2.SQLDigest, []Binding{b2}))
require.NoError(t, fbc.SetBinding(b3.SQLDigest, []Binding{b3}))
require.NoError(t, fbc.SetBinding(b1.SQLDigest, []*Binding{b1}))
require.NoError(t, fbc.SetBinding(b2.SQLDigest, []*Binding{b2}))
require.NoError(t, fbc.SetBinding(b3.SQLDigest, []*Binding{b3}))
require.Equal(t, len(fbc.digestBiMap.(*digestBiMapImpl).noDBDigest2SQLDigest), 2) // b1 and b2 have the same noDBDigest
require.Equal(t, len(fbc.digestBiMap.NoDBDigest2SQLDigest(fDigest1)), 2)
require.Equal(t, len(fbc.digestBiMap.NoDBDigest2SQLDigest(fDigest3)), 1)
Expand All @@ -70,8 +70,8 @@ func TestCrossDBBindingCache(t *testing.T) {
}

func TestBindCache(t *testing.T) {
bindings := Bindings{{BindSQL: "SELECT * FROM t1"}}
kvSize := int(bindings.size())
bindings := []*Binding{{BindSQL: "SELECT * FROM t1"}}
kvSize := int(bindings[0].size())
defer func(v int64) {
variable.MemQuotaBindingCache.Store(v)
}(variable.MemQuotaBindingCache.Load())
Expand Down
4 changes: 2 additions & 2 deletions pkg/bindinfo/binding_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func MatchSQLBindingForPlanCache(sctx sessionctx.Context, stmtNode ast.StmtNode,
}

// MatchSQLBinding returns the matched binding for this statement.
func MatchSQLBinding(sctx sessionctx.Context, stmtNode ast.StmtNode) (binding Binding, matched bool, scope string) {
func MatchSQLBinding(sctx sessionctx.Context, stmtNode ast.StmtNode) (binding *Binding, matched bool, scope string) {
return matchSQLBinding(sctx, stmtNode, nil)
}

func matchSQLBinding(sctx sessionctx.Context, stmtNode ast.StmtNode, info *BindingMatchInfo) (binding Binding, matched bool, scope string) {
func matchSQLBinding(sctx sessionctx.Context, stmtNode ast.StmtNode, info *BindingMatchInfo) (binding *Binding, matched bool, scope string) {
useBinding := sctx.GetSessionVars().UsePlanBaselines
if !useBinding || stmtNode == nil {
return
Expand Down
24 changes: 12 additions & 12 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type GlobalBindingHandle interface {
// Methods for create, get, drop global sql bindings.

// MatchGlobalBinding returns the matched binding for this statement.
MatchGlobalBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool)
MatchGlobalBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool)

// GetAllGlobalBindings returns all bind records in cache.
GetAllGlobalBindings() (bindings Bindings)
GetAllGlobalBindings() (bindings []*Binding)

// CreateGlobalBinding creates a Bindings to the storage and the cache.
// It replaces all the exists bindings for the same normalized SQL.
Expand Down Expand Up @@ -201,7 +201,7 @@ func (h *globalBindingHandle) LoadFromStorageToCache(fullLoad bool) (err error)
}

oldBinding := h.bindingCache.GetBinding(sqlDigest)
newBinding := removeDeletedBindings(merge(oldBinding, []Binding{binding}))
newBinding := removeDeletedBindings(merge(oldBinding, []*Binding{binding}))
if len(newBinding) > 0 {
err = h.bindingCache.SetBinding(sqlDigest, newBinding)
if err != nil {
Expand Down Expand Up @@ -401,12 +401,12 @@ func lockBindInfoTable(sctx sessionctx.Context) error {
}

// MatchGlobalBinding returns the matched binding for this statement.
func (h *globalBindingHandle) MatchGlobalBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool) {
func (h *globalBindingHandle) MatchGlobalBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool) {
return h.bindingCache.MatchingBinding(sctx, noDBDigest, tableNames)
}

// GetAllGlobalBindings returns all bind records in cache.
func (h *globalBindingHandle) GetAllGlobalBindings() (bindings Bindings) {
func (h *globalBindingHandle) GetAllGlobalBindings() (bindings []*Binding) {
return h.bindingCache.GetAllBindings()
}

Expand All @@ -427,13 +427,13 @@ func (h *globalBindingHandle) GetMemCapacity() (memCapacity int64) {
}

// newBinding builds Bindings from a tuple in storage.
func newBinding(sctx sessionctx.Context, row chunk.Row) (string, Binding, error) {
func newBinding(sctx sessionctx.Context, row chunk.Row) (string, *Binding, error) {
status := row.GetString(3)
// For compatibility, the 'Using' status binding will be converted to the 'Enabled' status binding.
if status == Using {
status = Enabled
}
binding := Binding{
binding := &Binding{
OriginalSQL: row.GetString(0),
Db: strings.ToLower(row.GetString(2)),
BindSQL: row.GetString(1),
Expand All @@ -447,7 +447,7 @@ func newBinding(sctx sessionctx.Context, row chunk.Row) (string, Binding, error)
PlanDigest: row.GetString(10),
}
sqlDigest := parser.DigestNormalized(binding.OriginalSQL)
err := prepareHints(sctx, &binding)
err := prepareHints(sctx, binding)
sctx.GetSessionVars().CurrentDB = binding.Db
return sqlDigest.String(), binding, err
}
Expand Down Expand Up @@ -615,7 +615,7 @@ func (h *globalBindingHandle) CloseCache() {
}

// LoadBindingsFromStorageToCache loads global bindings from storage to the memory cache.
func (h *globalBindingHandle) LoadBindingsFromStorage(sctx sessionctx.Context, sqlDigest string) (Bindings, error) {
func (h *globalBindingHandle) LoadBindingsFromStorage(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error) {
if sqlDigest == "" {
return nil, nil
}
Expand All @@ -632,7 +632,7 @@ func (h *globalBindingHandle) LoadBindingsFromStorage(sctx sessionctx.Context, s
if bindings == nil {
return nil, nil
}
return bindings.(Bindings), nil
return bindings.([]*Binding), nil
case <-time.After(timeout):
return nil, errors.New("load bindings from storage timeout")
}
Expand All @@ -642,14 +642,14 @@ func (h *globalBindingHandle) loadBindingsFromStorageInternal(sqlDigest string)
failpoint.Inject("load_bindings_from_storage_internal_timeout", func() {
time.Sleep(time.Second)
})
var bindings Bindings
var bindings []*Binding
selectStmt := fmt.Sprintf("SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source, sql_digest, plan_digest FROM mysql.bind_info where sql_digest = '%s'", sqlDigest)
err := h.callWithSCtx(false, func(sctx sessionctx.Context) error {
rows, _, err := execRows(sctx, selectStmt)
if err != nil {
return err
}
bindings = make([]Binding, 0, len(rows))
bindings = make([]*Binding, 0, len(rows))
for _, row := range rows {
// Skip the builtin record which is designed for binding synchronization.
if row.GetString(0) == BuiltinPseudoSQL4BindLock {
Expand Down
Loading

0 comments on commit 457dcc6

Please sign in to comment.