Skip to content

Commit

Permalink
ddl notifier: use pagination for SELECT to reduce memory usage (#58376)
Browse files Browse the repository at this point in the history
close #58368
  • Loading branch information
lance6716 authored Dec 23, 2024
1 parent 14a469a commit 5fbe4fb
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 101 deletions.
7 changes: 6 additions & 1 deletion pkg/ddl/notifier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
Expand All @@ -30,11 +31,12 @@ go_test(
timeout = "short",
srcs = [
"events_test.go",
"store_test.go",
"testkit_test.go",
],
embed = [":notifier"],
flaky = True,
shard_count = 7,
shard_count = 9,
deps = [
"//pkg/ddl",
"//pkg/ddl/session",
Expand All @@ -43,9 +45,12 @@ go_test(
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
],
)
30 changes: 15 additions & 15 deletions pkg/ddl/notifier/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@ import (

// PubSchemeChangeToStore publishes schema changes to the store to notify
// subscribers on the Store. It stages changes in given `se` so they will be
// visible when `se` further commits. When the schema change is not from
// multi-schema change DDL, `multiSchemaChangeSeq` is -1. Otherwise,
// `multiSchemaChangeSeq` is the sub-job index of the multi-schema change DDL.
// visible when `se` further commits. When the DDL contains only one schema
// change, `subJobID` is -1. Otherwise, `subJobID` is the sub-job index of the
// DDL, like multi-schema change or batched create table.
func PubSchemeChangeToStore(
ctx context.Context,
se *sess.Session,
ddlJobID int64,
multiSchemaChangeSeq int64,
subJobID int64,
event *SchemaChangeEvent,
store Store,
) error {
change := &schemaChange{
ddlJobID: ddlJobID,
multiSchemaChangeSeq: multiSchemaChangeSeq,
event: event,
change := &SchemaChange{
ddlJobID: ddlJobID,
subJobID: subJobID,
event: event,
}
return store.Insert(ctx, se, change)
}

// schemaChange is the Golang representation of the persistent data. (ddlJobID,
// multiSchemaChangeSeq) should be unique in the cluster.
type schemaChange struct {
ddlJobID int64
multiSchemaChangeSeq int64
event *SchemaChangeEvent
processedByFlag uint64
// SchemaChange is the Golang representation of the persistent data. (ddlJobID,
// subJobID) should be unique in the cluster.
type SchemaChange struct {
ddlJobID int64
subJobID int64
event *SchemaChangeEvent
processedByFlag uint64
}
125 changes: 98 additions & 27 deletions pkg/ddl/notifier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (

"github.com/pingcap/errors"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/util/chunk"
)

// CloseFn is the function to release the resource.
type CloseFn func()

// Store is the (de)serialization and persistent layer.
type Store interface {
Insert(context.Context, *sess.Session, *schemaChange) error
Insert(context.Context, *sess.Session, *SchemaChange) error
UpdateProcessed(
ctx context.Context,
se *sess.Session,
Expand All @@ -34,15 +38,29 @@ type Store interface {
processedBy uint64,
) error
DeleteAndCommit(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error
List(ctx context.Context, se *sess.Session) ([]*schemaChange, error)
// List will start a transaction of given session and read all schema changes
// through a ListResult. The ownership of session is occupied by Store until
// CloseFn is called.
List(ctx context.Context, se *sess.Session) (ListResult, CloseFn)
}

// ListResult is the result stream of a List operation.
type ListResult interface {
// Read tries to decode at most `len(changes)` SchemaChange into given slices. It
// returns the number of schemaChanges decoded, 0 means no more schemaChanges.
//
// Note that the previous SchemaChange in the slice will be overwritten when call
// Read.
Read(changes []*SchemaChange) (int, error)
}

type tableStore struct {
db string
table string
}

func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schemaChange) error {
// Insert implements Store interface.
func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *SchemaChange) error {
event, err := json.Marshal(change.event)
if err != nil {
return errors.Trace(err)
Expand All @@ -58,11 +76,12 @@ func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schema
)
_, err = s.Execute(
ctx, sql, "ddl_notifier",
change.ddlJobID, change.multiSchemaChangeSeq, event,
change.ddlJobID, change.subJobID, event,
)
return err
}

// UpdateProcessed implements Store interface.
func (t *tableStore) UpdateProcessed(
ctx context.Context,
se *sess.Session,
Expand All @@ -81,6 +100,7 @@ func (t *tableStore) UpdateProcessed(
return err
}

// DeleteAndCommit implements Store interface.
func (t *tableStore) DeleteAndCommit(
ctx context.Context,
se *sess.Session,
Expand All @@ -106,34 +126,85 @@ func (t *tableStore) DeleteAndCommit(
return errors.Trace(err)
}

func (t *tableStore) List(ctx context.Context, se *sess.Session) ([]*schemaChange, error) {
sql := fmt.Sprintf(`
SELECT
ddl_job_id,
sub_job_id,
schema_change,
processed_by_flag
FROM %s.%s ORDER BY ddl_job_id, sub_job_id`,
t.db, t.table)
rows, err := se.Execute(ctx, sql, "ddl_notifier")
// List implements Store interface.
func (t *tableStore) List(ctx context.Context, se *sess.Session) (ListResult, CloseFn) {
return &listResult{
ctx: ctx,
se: se,
sqlTemplate: fmt.Sprintf(`
SELECT
ddl_job_id,
sub_job_id,
schema_change,
processed_by_flag
FROM %s.%s
WHERE (ddl_job_id, sub_job_id) > (%%?, %%?)
ORDER BY ddl_job_id, sub_job_id
LIMIT %%?`,
t.db, t.table),
// DDL job ID are always positive, so we can use 0 as the initial value.
maxReturnedDDLJobID: 0,
maxReturnedSubJobID: 0,
}, se.Rollback
}

type listResult struct {
ctx context.Context
se *sess.Session
sqlTemplate string
maxReturnedDDLJobID int64
maxReturnedSubJobID int64
}

// Read implements ListResult interface.
func (r *listResult) Read(changes []*SchemaChange) (int, error) {
if r.maxReturnedDDLJobID == 0 && r.maxReturnedSubJobID == 0 {
err := r.se.Begin(r.ctx)
if err != nil {
return 0, errors.Trace(err)
}
}

rows, err := r.se.Execute(
r.ctx, r.sqlTemplate, "ddl_notifier",
r.maxReturnedDDLJobID, r.maxReturnedSubJobID, len(changes),
)
if err != nil {
return nil, err
return 0, errors.Trace(err)
}
ret := make([]*schemaChange, 0, len(rows))
for _, row := range rows {
event := SchemaChangeEvent{}
err = json.Unmarshal(row.GetBytes(2), &event)

if err = r.unmarshalSchemaChanges(rows, changes); err != nil {
return 0, errors.Trace(err)
}
return len(rows), nil
}

func (r *listResult) unmarshalSchemaChanges(rows []chunk.Row, changes []*SchemaChange) error {
for i, row := range rows {
if changes[i] == nil {
changes[i] = new(SchemaChange)
}
if changes[i].event == nil {
changes[i].event = new(SchemaChangeEvent)
}
if changes[i].event.inner == nil {
changes[i].event.inner = new(jsonSchemaChangeEvent)
}

err := json.Unmarshal(row.GetBytes(2), changes[i].event.inner)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
changes[i].ddlJobID = row.GetInt64(0)
changes[i].subJobID = row.GetInt64(1)
changes[i].processedByFlag = row.GetUint64(3)

if i == len(rows)-1 {
r.maxReturnedDDLJobID = changes[i].ddlJobID
r.maxReturnedSubJobID = changes[i].subJobID
}
ret = append(ret, &schemaChange{
ddlJobID: row.GetInt64(0),
multiSchemaChangeSeq: row.GetInt64(1),
event: &event,
processedByFlag: row.GetUint64(3),
})
}
return ret, nil
return nil
}

// OpenTableStore opens a store on a created table `db`.`table`. The table should
Expand Down
80 changes: 80 additions & 0 deletions pkg/ddl/notifier/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package notifier

import (
"encoding/json"
"testing"

"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
)

func TestLeftoverWhenUnmarshal(t *testing.T) {
r := &listResult{}
changesReused := []*SchemaChange{
{event: &SchemaChangeEvent{inner: &jsonSchemaChangeEvent{
TableInfo: &model.TableInfo{
Name: pmodel.NewCIStr("old"),
Columns: []*model.ColumnInfo{{Name: pmodel.NewCIStr("c1")}},
Indices: []*model.IndexInfo{
{Name: pmodel.NewCIStr("i1")},
{Name: pmodel.NewCIStr("i2")},
{Name: pmodel.NewCIStr("i3")},
},
},
}}},
{event: &SchemaChangeEvent{inner: &jsonSchemaChangeEvent{
AddedPartInfo: &model.PartitionInfo{Expr: "test"},
}}},
nil,
}

newTableInfo := &model.TableInfo{
Name: pmodel.NewCIStr("new"),
Columns: []*model.ColumnInfo{
{Name: pmodel.NewCIStr("c2")},
{Name: pmodel.NewCIStr("c3")},
},
Indices: []*model.IndexInfo{
{Name: pmodel.NewCIStr("i4")},
},
Constraints: []*model.ConstraintInfo{
{Name: pmodel.NewCIStr("c1")},
},
}

newTableInfoJSON, err := json.Marshal(jsonSchemaChangeEvent{TableInfo: newTableInfo})
require.NoError(t, err)
sameRow := chunk.MutRowFromDatums([]types.Datum{
types.NewIntDatum(1), types.NewIntDatum(1),
types.NewBytesDatum(newTableInfoJSON), types.NewUintDatum(0),
}).ToRow()
rows := []chunk.Row{sameRow, sameRow, sameRow}

err = r.unmarshalSchemaChanges(rows, changesReused)
require.NoError(t, err)

require.Equal(t, newTableInfo, changesReused[0].event.inner.TableInfo)
require.Equal(t, newTableInfo, changesReused[1].event.inner.TableInfo)
// The leftover will not be cleaned right after unmarshal. It will be cleaned be
// GC later. Because we use type field to determine read which field, so the
// leftover will not affect the result.
require.NotNil(t, changesReused[1].event.inner.AddedPartInfo)
require.Equal(t, newTableInfo, changesReused[2].event.inner.TableInfo)
}
Loading

0 comments on commit 5fbe4fb

Please sign in to comment.