Skip to content

Commit

Permalink
flashback: make br backup full works during flashback cluster (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 authored Feb 8, 2023
1 parent 3f0964f commit 91f6752
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 21 deletions.
1 change: 0 additions & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go_library(
"//distsql",
"//kv",
"//meta",
"//meta/autoid",
"//parser/model",
"//statistics/handle",
"//util",
Expand Down
16 changes: 6 additions & 10 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -561,24 +560,21 @@ func BuildBackupRangeAndSchema(
zap.String("table", tableInfo.Name.O),
)

tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)
autoIDAccess := m.GetAutoIDAccessors(dbInfo.ID, tableInfo.ID)

var globalAutoID int64
switch {
case tableInfo.IsSequence():
globalAutoID, err = seqAlloc.NextGlobalAutoID()
globalAutoID, err = autoIDAccess.SequenceCycle().Get()
case tableInfo.IsView() || !utils.NeedAutoID(tableInfo):
// no auto ID for views or table without either rowID nor auto_increment ID.
default:
globalAutoID, err = idAlloc.NextGlobalAutoID()
globalAutoID, err = autoIDAccess.RowID().Get()
}
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
tableInfo.AutoIncID = globalAutoID
tableInfo.AutoIncID = globalAutoID + 1
if !isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
Expand All @@ -591,11 +587,11 @@ func BuildBackupRangeAndSchema(
if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = randAlloc.NextGlobalAutoID()
globalAutoRandID, err = autoIDAccess.RandomID().Get()
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
tableInfo.AutoRandID = globalAutoRandID
tableInfo.AutoRandID = globalAutoRandID + 1
logger.Debug("change table AutoRandID",
zap.Int64("AutoRandID", globalAutoRandID))
}
Expand Down
12 changes: 6 additions & 6 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ func splitRegionsByKeyRanges(d *ddlCtx, keyRanges []kv.KeyRange) {

// A Flashback has 4 different stages.
// 1. before lock flashbackClusterJobID, check clusterJobID and lock it.
// 2. before flashback start, check timestamp, disable GC and close PD schedule.
// 3. phase 1, get key ranges, lock all regions.
// 2. before flashback start, check timestamp, disable GC and close PD schedule, get flashback key ranges.
// 3. phase 1, lock flashback key ranges.
// 4. phase 2, send flashback RPC, do flashback jobs.
func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
inFlashbackTest := false
Expand Down Expand Up @@ -692,7 +692,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue
job.SchemaState = model.StateDeleteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
Expand All @@ -711,8 +711,8 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
job.Args[keyRangesOffset] = keyRanges
job.SchemaState = model.StateWriteOnly
return ver, nil
// Stage 3, get key ranges and get locks.
return updateSchemaVersion(d, t, job)
// Stage 3, lock related key ranges.
case model.StateWriteOnly:
// TODO: Support flashback in unistore.
if inFlashbackTest {
Expand Down Expand Up @@ -742,7 +742,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
job.Args[commitTSOffset] = commitTS
job.SchemaState = model.StateWriteReorganization
return updateSchemaVersion(d, t, job)
return ver, nil
// Stage 4, get key ranges and send flashback RPC.
case model.StateWriteReorganization:
// TODO: Support flashback in unistore.
Expand Down
3 changes: 2 additions & 1 deletion ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func TestAddDDLDuringFlashback(t *testing.T) {
hook.OnJobRunBeforeExported = func(job *model.Job) {
assert.Equal(t, model.ActionFlashbackCluster, job.Type)
if job.SchemaState == model.StateWriteOnly {
_, err := tk.Exec("alter table t add column b int")
tk1 := testkit.NewTestKit(t, store)
_, err := tk1.Exec("alter table test.t add column b int")
assert.ErrorContains(t, err, "Can't add ddl job, have flashback cluster job")
}
}
Expand Down
24 changes: 22 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,18 @@ func (do *Domain) GetScope(status string) variable.ScopeFlag {
return variable.DefaultStatusVarScopeFlag
}

func getFlashbackStartTSFromErrorMsg(err error) uint64 {
slices := strings.Split(err.Error(), "is in flashback progress, FlashbackStartTS is ")
if len(slices) != 2 {
return 0
}
version, err := strconv.ParseUint(slices[1], 10, 0)
if err != nil {
return 0
}
return version
}

// Reload reloads InfoSchema.
// It's public in order to do the test.
func (do *Domain) Reload() error {
Expand All @@ -490,7 +502,15 @@ func (do *Domain) Reload() error {
return err
}

is, hitCache, oldSchemaVersion, changes, err := do.loadInfoSchema(ver.Ver)
version := ver.Ver
is, hitCache, oldSchemaVersion, changes, err := do.loadInfoSchema(version)
if err != nil {
if version = getFlashbackStartTSFromErrorMsg(err); version != 0 {
// use the lastest available version to create domain
version -= 1
is, hitCache, oldSchemaVersion, changes, err = do.loadInfoSchema(version)
}
}
metrics.LoadSchemaDuration.Observe(time.Since(startTime).Seconds())
if err != nil {
metrics.LoadSchemaCounter.WithLabelValues("failed").Inc()
Expand Down Expand Up @@ -519,7 +539,7 @@ func (do *Domain) Reload() error {
}

// lease renew, so it must be executed despite it is cache or not
do.SchemaValidator.Update(ver.Ver, oldSchemaVersion, is.SchemaMetaVersion(), changes)
do.SchemaValidator.Update(version, oldSchemaVersion, is.SchemaMetaVersion(), changes)
lease := do.DDL().GetLease()
sub := time.Since(startTime)
// Reload interval is lease / 2, if load schema time elapses more than this interval,
Expand Down
4 changes: 3 additions & 1 deletion domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha
s.do.Store().GetMemCache().Delete(tblIDs[idx])
}
if ac == uint64(model.ActionFlashbackCluster) {
s.do.InfoSyncer().GetSessionManager().KillNonFlashbackClusterConn()
if s.do != nil && s.do.InfoSyncer() != nil && s.do.InfoSyncer().GetSessionManager() != nil {
s.do.InfoSyncer().GetSessionManager().KillNonFlashbackClusterConn()
}
}
}
logutil.BgLogger().Debug("update schema validator", zap.Int64("oldVer", oldVer),
Expand Down
4 changes: 4 additions & 0 deletions tests/realtikvtest/flashbacktest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ go_test(
flaky = True,
race = "on",
deps = [
"//ddl",
"//ddl/util",
"//domain",
"//errno",
"//meta",
"//parser/model",
"//testkit",
"//testkit/testsetup",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
Expand Down
73 changes: 73 additions & 0 deletions tests/realtikvtest/flashbacktest/flashback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ package flashbacktest
import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
tikvutil "github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -524,3 +529,71 @@ func TestFlashbackTmpTable(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
}

func TestFlashbackInProcessErrorMsg(t *testing.T) {
if *realtikvtest.WithRealTiKV {
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)

originHook := dom.DDL().GetHook()

tk := testkit.NewTestKit(t, store)
timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()

tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")

time.Sleep(1 * time.Second)
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

// do some ddl and dml
tk.MustExec("alter table t add index k(a)")
tk.MustExec("insert into t values (1), (2), (3)")

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

hook := newTestCallBack(t, dom)
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionFlashbackCluster && job.SchemaState == model.StateWriteReorganization {
txn, err := store.Begin()
assert.NoError(t, err)
_, err = meta.NewMeta(txn).ListDatabases()
errorMsg := err.Error()
assert.Contains(t, errorMsg, "is in flashback progress, FlashbackStartTS is ")
slices := strings.Split(errorMsg, "is in flashback progress, FlashbackStartTS is ")
assert.Equal(t, len(slices), 2)
assert.NotEqual(t, slices[1], "0")
txn.Rollback()
}
}
dom.DDL().SetHook(hook)
tk.Exec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
dom.DDL().SetHook(originHook)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
}

type testCallback struct {
ddl.Callback
OnJobRunBeforeExported func(job *model.Job)
}

func newTestCallBack(t *testing.T, dom *domain.Domain) *testCallback {
defHookFactory, err := ddl.GetCustomizedHook("default_hook")
require.NoError(t, err)
return &testCallback{
Callback: defHookFactory(dom),
}
}

func (c *testCallback) OnJobRunBefore(job *model.Job) {
if c.OnJobRunBeforeExported != nil {
c.OnJobRunBeforeExported(job)
}
}

0 comments on commit 91f6752

Please sign in to comment.