From 005c8d8ef8e9cffece93c96a6ff73381df76b52b Mon Sep 17 00:00:00 2001 From: wjHuang Date: Fri, 6 Jan 2023 18:26:21 +0800 Subject: [PATCH 1/6] *: remove the support of the `amending transaction` (#39714) close pingcap/tidb#40381 --- ddl/column_modify_test.go | 75 -- ddl/db_test.go | 247 ------ ddl/index_merge_tmp_test.go | 10 - ddl/table_modify_test.go | 13 - domain/schema_validator.go | 24 +- domain/schema_validator_test.go | 12 +- expression/integration_test.go | 19 - session/BUILD.bazel | 12 - session/schema_amender.go | 713 ------------------ session/schema_amender_test.go | 477 ------------ session/schema_test.go | 11 - session/session.go | 7 - sessionctx/variable/session.go | 4 - sessionctx/variable/sysvar.go | 8 - sessionctx/variable/tidb_vars.go | 4 - sessionctx/variable/varsutil_test.go | 3 - store/driver/txn/txn_driver.go | 2 - .../pessimistictest/pessimistic_test.go | 635 ---------------- 18 files changed, 11 insertions(+), 2265 deletions(-) delete mode 100644 session/schema_amender.go delete mode 100644 session/schema_amender_test.go diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index ac9b9cf0ecc3d..658039c1092a4 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -1029,78 +1029,3 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) { tk.MustExec("drop table if exists t") } - -func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0") - tk.MustExec("set global tidb_enable_amend_pessimistic_txn = ON") - defer tk.MustExec("set global tidb_enable_amend_pessimistic_txn = OFF") - - d := dom.DDL() - testInsertOnModifyColumn := func(sql string, startColState, commitColState model.SchemaState, retStrs []string, retErr error) { - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1 (c1 int, c2 int, c3 int, unique key(c1))") - tk.MustExec("insert into t1 values (20, 20, 20);") - - var checkErr error - tk1 := testkit.NewTestKit(t, store) - defer func() { - if tk1.Session() != nil { - tk1.Session().Close() - } - }() - hook := &ddl.TestDDLCallback{Do: dom} - times := 0 - hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != startColState { - return - } - - tk1.MustExec("use test") - tk1.MustExec("begin pessimistic;") - tk1.MustExec("insert into t1 values(101, 102, 103)") - } - onJobUpdatedExportedFunc := func(job *model.Job) { - if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != commitColState { - return - } - if times == 0 { - _, checkErr = tk1.Exec("commit;") - } - times++ - } - hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) - d.SetHook(hook) - - tk.MustExec(sql) - if retErr == nil { - require.NoError(t, checkErr) - } else { - require.Error(t, checkErr) - require.Contains(t, checkErr.Error(), retErr.Error()) - } - tk.MustQuery("select * from t1").Check(testkit.Rows(retStrs...)) - tk.MustExec("admin check table t1") - } - - // Testing it needs reorg data. - ddlStatement := "alter table t1 change column c2 cc smallint;" - testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateDeleteOnly, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateDeleteOnly, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - - // Testing it needs not reorg data. This case only have two states: none, public. - ddlStatement = "alter table t1 change column c2 cc bigint;" - testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StateWriteReorganization, []string{"20 20 20"}, nil) - testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StateWriteReorganization, []string{"20 20 20"}, nil) - testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StatePublic, []string{"20 20 20", "101 102 103"}, nil) - testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StatePublic, []string{"20 20 20"}, nil) -} diff --git a/ddl/db_test.go b/ddl/db_test.go index 7d1e829b3db61..3380af7e0a2c5 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -41,7 +41,6 @@ import ( "github.com/pingcap/tidb/parser/terror" parsertypes "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/store/mockstore" @@ -55,7 +54,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" - "golang.org/x/exp/slices" ) const ( @@ -981,202 +979,6 @@ func TestDDLJobErrorCount(t *testing.T) { require.True(t, kv.ErrEntryTooLarge.Equal(historyJob.Error)) } -func TestCommitTxnWithIndexChange(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) - // Prepare work. - tk := testkit.NewTestKit(t, store) - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk.MustExec("use test") - tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))") - tk.MustExec("insert t1 values (1, 10, 100), (2, 20, 200)") - tk.MustExec("alter table t1 add index k2(c2)") - tk.MustExec("alter table t1 drop index k2") - tk.MustExec("alter table t1 add index k2(c2)") - tk.MustExec("alter table t1 drop index k2") - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - - // tkSQLs are the sql statements for the pessimistic transaction. - // tk2DDL are the ddl statements executed before the pessimistic transaction. - // idxDDL is the DDL statement executed between pessimistic transaction begin and commit. - // failCommit means the pessimistic transaction commit should fail not. - type caseUnit struct { - tkSQLs []string - tk2DDL []string - idxDDL string - checkSQLs []string - rowsExps [][]string - failCommit bool - stateEnd model.SchemaState - } - - cases := []caseUnit{ - // Test secondary index - {[]string{"insert into t1 values(3, 30, 300)", - "insert into t2 values(11, 11, 11)"}, - []string{"alter table t1 add index k2(c2)", - "alter table t1 drop index k2", - "alter table t1 add index kk2(c2, c1)", - "alter table t1 add index k2(c2)", - "alter table t1 drop index k2"}, - "alter table t1 add index k2(c2)", - []string{"select c3, c2 from t1 use index(k2) where c2 = 20", - "select c3, c2 from t1 use index(k2) where c2 = 10", - "select * from t1", - "select * from t2 where c1 = 11"}, - [][]string{{"200 20"}, - {"100 10"}, - {"1 10 100", "2 20 200", "3 30 300"}, - {"11 11 11"}}, - false, - model.StateNone}, - // Test secondary index - {[]string{"insert into t2 values(5, 50, 500)", - "insert into t2 values(11, 11, 11)", - "delete from t2 where c2 = 11", - "update t2 set c2 = 110 where c1 = 11"}, - // "update t2 set c1 = 10 where c3 = 100"}, - []string{"alter table t1 add index k2(c2)", - "alter table t1 drop index k2", - "alter table t1 add index kk2(c2, c1)", - "alter table t1 add index k2(c2)", - "alter table t1 drop index k2"}, - "alter table t1 add index k2(c2)", - []string{"select c3, c2 from t1 use index(k2) where c2 = 20", - "select c3, c2 from t1 use index(k2) where c2 = 10", - "select * from t1", - "select * from t2 where c1 = 11", - "select * from t2 where c3 = 100"}, - [][]string{{"200 20"}, - {"100 10"}, - {"1 10 100", "2 20 200"}, - {}, - {"1 10 100"}}, - false, - model.StateNone}, - // Test unique index - {[]string{"insert into t1 values(3, 30, 300)", - "insert into t1 values(4, 40, 400)", - "insert into t2 values(11, 11, 11)", - "insert into t2 values(12, 12, 11)"}, - []string{"alter table t1 add unique index uk3(c3)", - "alter table t1 drop index uk3", - "alter table t2 add unique index ukc1c3(c1, c3)", - "alter table t2 add unique index ukc3(c3)", - "alter table t2 drop index ukc1c3", - "alter table t2 drop index ukc3", - "alter table t2 add index kc3(c3)"}, - "alter table t1 add unique index uk3(c3)", - []string{"select c3, c2 from t1 use index(uk3) where c3 = 200", - "select c3, c2 from t1 use index(uk3) where c3 = 300", - "select c3, c2 from t1 use index(uk3) where c3 = 400", - "select * from t1", - "select * from t2"}, - [][]string{{"200 20"}, - {"300 30"}, - {"400 40"}, - {"1 10 100", "2 20 200", "3 30 300", "4 40 400"}, - {"1 10 100", "2 20 200", "11 11 11", "12 12 11"}}, - false, model.StateNone}, - // Test unique index fail to commit, this case needs the new index could be inserted - {[]string{"insert into t1 values(3, 30, 300)", - "insert into t1 values(4, 40, 300)", - "insert into t2 values(11, 11, 11)", - "insert into t2 values(12, 11, 12)"}, - //[]string{"alter table t1 add unique index uk3(c3)", "alter table t1 drop index uk3"}, - []string{}, - "alter table t1 add unique index uk3(c3)", - []string{"select c3, c2 from t1 use index(uk3) where c3 = 200", - "select c3, c2 from t1 use index(uk3) where c3 = 300", - "select c3, c2 from t1 where c1 = 4", - "select * from t1", - "select * from t2"}, - [][]string{{"200 20"}, - {}, - {}, - {"1 10 100", "2 20 200"}, - {"1 10 100", "2 20 200"}}, - true, - model.StateWriteOnly}, - } - tk.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200")) - - // Test add index state change - do := dom.DDL() - startStates := []model.SchemaState{model.StateNone, model.StateDeleteOnly} - for _, startState := range startStates { - endStatMap := session.ConstOpAddIndex[startState] - var endStates []model.SchemaState - for st := range endStatMap { - endStates = append(endStates, st) - } - slices.Sort(endStates) - for _, endState := range endStates { - for _, curCase := range cases { - if endState < curCase.stateEnd { - break - } - tk2.MustExec("drop table if exists t1") - tk2.MustExec("drop table if exists t2") - tk2.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))") - tk2.MustExec("create table t2 (c1 int primary key, c2 int, c3 int, index ok2(c2))") - tk2.MustExec("insert t1 values (1, 10, 100), (2, 20, 200)") - tk2.MustExec("insert t2 values (1, 10, 100), (2, 20, 200)") - tk2.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200")) - tk.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200")) - tk.MustQuery("select * from t2;").Check(testkit.Rows("1 10 100", "2 20 200")) - - for _, DDLSQL := range curCase.tk2DDL { - tk2.MustExec(DDLSQL) - } - hook := &ddl.TestDDLCallback{Do: dom} - prepared := false - committed := false - hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.SchemaState == startState { - if !prepared { - tk.MustExec("begin pessimistic") - for _, tkSQL := range curCase.tkSQLs { - tk.MustExec(tkSQL) - } - prepared = true - } - } - } - onJobUpdatedExportedFunc := func(job *model.Job) { - if job.SchemaState == endState { - if !committed { - if curCase.failCommit { - err := tk.ExecToErr("commit") - require.Error(t, err) - } else { - tk.MustExec("commit") - } - } - committed = true - } - } - hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) - originalCallback := do.GetHook() - do.SetHook(hook) - tk2.MustExec(curCase.idxDDL) - do.SetHook(originalCallback) - tk2.MustExec("admin check table t1") - for i, checkSQL := range curCase.checkSQLs { - if len(curCase.rowsExps[i]) > 0 { - tk2.MustQuery(checkSQL).Check(testkit.Rows(curCase.rowsExps[i]...)) - } else { - tk2.MustQuery(checkSQL).Check(nil) - } - } - } - } - } - tk.MustExec("admin check table t1") -} - // TestAddIndexFailOnCaseWhenCanExit is used to close #19325. func TestAddIndexFailOnCaseWhenCanExit(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCaseWhenParseFailure", `return(true)`)) @@ -1380,55 +1182,6 @@ func TestTxnSavepointWithDDL(t *testing.T) { tk.MustExec("admin check table t1, t2") } -func TestAmendTxnSavepointWithDDL(t *testing.T) { - store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test;") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2.MustExec("use test;") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - - prepareFn := func() { - tk.MustExec("drop table if exists t1, t2") - tk.MustExec("create table t1 (c1 int primary key, c2 int)") - tk.MustExec("create table t2 (c1 int primary key, c2 int)") - } - - prepareFn() - tk.MustExec("truncate table t1") - tk.MustExec("begin pessimistic") - tk.MustExec("savepoint s1") - tk.MustExec("insert t1 values (1, 11)") - tk.MustExec("savepoint s2") - tk.MustExec("insert t2 values (1, 11)") - tk.MustExec("rollback to s2") - tk2.MustExec("alter table t1 add index idx2(c2)") - tk2.MustExec("alter table t2 add index idx2(c2)") - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("1 11")) - tk.MustQuery("select * from t2").Check(testkit.Rows()) - tk.MustExec("admin check table t1, t2") - - prepareFn() - tk.MustExec("truncate table t1") - tk.MustExec("begin pessimistic") - tk.MustExec("savepoint s1") - tk.MustExec("insert t1 values (1, 11)") - tk.MustExec("savepoint s2") - tk.MustExec("insert t2 values (1, 11)") - tk.MustExec("savepoint s3") - tk.MustExec("insert t2 values (2, 22)") - tk.MustExec("rollback to s3") - tk2.MustExec("alter table t1 add index idx2(c2)") - tk2.MustExec("alter table t2 add index idx2(c2)") - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("1 11")) - tk.MustQuery("select * from t2").Check(testkit.Rows("1 11")) - tk.MustExec("admin check table t1, t2") -} - func TestSnapshotVersion(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 6027c5cee215f..3f12358c26658 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -251,16 +251,6 @@ func findIdxInfo(dom *domain.Domain, dbName, tbName, idxName string) *model.Inde return tbl.Meta().FindIndexByName(idxName) } -func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 1;") - - tk.MustGetErrMsg("set @@tidb_enable_amend_pessimistic_txn = 1;", - "amend pessimistic transactions is not compatible with tidb_ddl_enable_fast_reorg") -} - // TestCreateUniqueIndexKeyExist this case will test below things: // Create one unique index idx((a*b+1)); // insert (0, 6) and delete it; diff --git a/ddl/table_modify_test.go b/ddl/table_modify_test.go index c042d266ac9e2..590fea8ad973d 100644 --- a/ddl/table_modify_test.go +++ b/ddl/table_modify_test.go @@ -117,7 +117,6 @@ func TestLockTableReadOnly(t *testing.T) { tk1 := testkit.NewTestKit(t, store) tk2 := testkit.NewTestKit(t, store) tk1.MustExec("use test") - tk1.MustExec("set global tidb_enable_metadata_lock=0") tk2.MustExec("use test") tk1.MustExec("drop table if exists t1,t2") defer func() { @@ -162,18 +161,6 @@ func TestLockTableReadOnly(t *testing.T) { require.True(t, terror.ErrorEqual(tk2.ExecToErr("lock tables t1 write local"), infoschema.ErrTableLocked)) tk1.MustExec("admin cleanup table lock t1") tk2.MustExec("insert into t1 set a=1, b=2") - - tk1.MustExec("set global tidb_ddl_enable_fast_reorg = 0") - tk1.MustExec("set tidb_enable_amend_pessimistic_txn = 1") - tk1.MustExec("begin pessimistic") - tk1.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 2")) - tk2.MustExec("update t1 set b = 3") - tk2.MustExec("alter table t1 read only") - tk2.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 3")) - tk1.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 2")) - tk1.MustExec("update t1 set b = 4") - require.True(t, terror.ErrorEqual(tk1.ExecToErr("commit"), domain.ErrInfoSchemaChanged)) - tk2.MustExec("alter table t1 read write") } // TestConcurrentLockTables test concurrent lock/unlock tables. diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 4d7cf2e9b814a..592f558f0b27c 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -164,19 +164,18 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha // isRelatedTablesChanged returns the result whether relatedTableIDs is changed // from usedVer to the latest schema version. // NOTE, this function should be called under lock! -func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) (transaction.RelatedSchemaChange, bool) { - res := transaction.RelatedSchemaChange{} +func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) bool { if len(s.deltaSchemaInfos) == 0 { metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheEmpty).Inc() logutil.BgLogger().Info("schema change history is empty", zap.Int64("currVer", currVer)) - return res, true + return true } newerDeltas := s.findNewerDeltas(currVer) if len(newerDeltas) == len(s.deltaSchemaInfos) { metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheMiss).Inc() logutil.BgLogger().Info("the schema version is much older than the latest version", zap.Int64("currVer", currVer), zap.Int64("latestSchemaVer", s.latestSchemaVer), zap.Reflect("deltas", newerDeltas)) - return res, true + return true } changedTblMap := make(map[int64]uint64) @@ -198,22 +197,15 @@ func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64 } if len(changedTblMap) > 0 { tblIds := make([]int64, 0, len(changedTblMap)) - actionTypes := make([]uint64, 0, len(changedTblMap)) for id := range changedTblMap { tblIds = append(tblIds, id) } slices.Sort(tblIds) - for _, tblID := range tblIds { - actionTypes = append(actionTypes, changedTblMap[tblID]) - } - res.PhyTblIDS = tblIds - res.ActionTypes = actionTypes - res.Amendable = true logutil.BgLogger().Info("schema of tables in the transaction are changed", zap.Int64s("conflicted table IDs", tblIds), zap.Int64("transaction schema", currVer), zap.Int64s("schema versions that changed the tables", changedSchemaVers)) - return res, true + return true } - return res, false + return false } func (s *schemaValidator) findNewerDeltas(currVer int64) []deltaSchemaInfo { @@ -251,12 +243,8 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedPhysicalTa // When disabling MDL -> enabling MDL, the old transaction's needCheckSchema is true, we need to check it. // When enabling MDL -> disabling MDL, the old transaction's needCheckSchema is false, so still need to check it, and variable EnableMDL is false now. if needCheckSchema || !variable.EnableMDL.Load() { - relatedChanges, changed := s.isRelatedTablesChanged(schemaVer, relatedPhysicalTableIDs) + changed := s.isRelatedTablesChanged(schemaVer, relatedPhysicalTableIDs) if changed { - if relatedChanges.Amendable { - relatedChanges.LatestInfoSchema = s.latestInfoSchema - return &relatedChanges, ResultFail - } return nil, ResultFail } } diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index 8762e46bd718e..ddcc57634ab60 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -61,7 +61,7 @@ func subTestSchemaValidatorGeneral(t *testing.T) { // Stop the validator, validator's items value is nil. validator.Stop() require.False(t, validator.IsStarted()) - _, isTablesChanged := validator.isRelatedTablesChanged(item.schemaVer, []int64{10}) + isTablesChanged := validator.isRelatedTablesChanged(item.schemaVer, []int64{10}) require.True(t, isTablesChanged) _, valid = validator.Check(item.leaseGrantTS, item.schemaVer, []int64{10}, true) require.Equal(t, ResultUnknown, valid) @@ -91,12 +91,12 @@ func subTestSchemaValidatorGeneral(t *testing.T) { validator.Update(ts, currVer, newItem.schemaVer, &transaction.RelatedSchemaChange{PhyTblIDS: []int64{1, 2, 3}, ActionTypes: []uint64{1, 2, 3}}) // Make sure the updated table IDs don't be covered with the same schema version. validator.Update(ts, newItem.schemaVer, newItem.schemaVer, nil) - _, isTablesChanged = validator.isRelatedTablesChanged(currVer, nil) + isTablesChanged = validator.isRelatedTablesChanged(currVer, nil) require.False(t, isTablesChanged) - _, isTablesChanged = validator.isRelatedTablesChanged(currVer, []int64{2}) + isTablesChanged = validator.isRelatedTablesChanged(currVer, []int64{2}) require.Truef(t, isTablesChanged, "currVer %d, newItem %v", currVer, newItem) // The current schema version is older than the oldest schema version. - _, isTablesChanged = validator.isRelatedTablesChanged(-1, nil) + isTablesChanged = validator.isRelatedTablesChanged(-1, nil) require.Truef(t, isTablesChanged, "currVer %d, newItem %v", currVer, newItem) // All schema versions is expired. @@ -214,10 +214,8 @@ func subTestEnqueueActionType(t *testing.T) { // Check the flag set by schema diff, note tableID = 3 has been set flag 0x3 in schema version 9, and flag 0x4 // in schema version 10, so the resActions for tableID = 3 should be 0x3 & 0x4 = 0x7. - relatedChanges, isTablesChanged := validator.isRelatedTablesChanged(5, []int64{1, 2, 3, 4}) + isTablesChanged := validator.isRelatedTablesChanged(5, []int64{1, 2, 3, 4}) require.True(t, isTablesChanged) - require.Equal(t, []int64{1, 2, 3, 4}, relatedChanges.PhyTblIDS) - require.Equal(t, []uint64{(1 << 1) | (1 << 15), 1 << 2, (1 << 3) | (1 << 4), 1 << 4}, relatedChanges.ActionTypes) } type leaseGrantItem struct { diff --git a/expression/integration_test.go b/expression/integration_test.go index 6525120d59eb0..a49df593d201c 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -4989,25 +4989,6 @@ func TestIssue18525(t *testing.T) { tk.MustQuery("select INTERVAL( ( CONVERT( -11752 USING utf8 ) ), 6558853612195285496, `col1`) from t1").Check(testkit.Rows("0", "0", "0")) } -func TestSchemaDMLNotChange(t *testing.T) { - store := testkit.CreateMockStore(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk2.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (id int primary key, c_json json);") - tk.MustExec("insert into t values (1, '{\"k\": 1}');") - tk.MustExec("begin") - tk.MustExec("update t set c_json = '{\"k\": 2}' where id = 1;") - tk2.MustExec("alter table t rename column c_json to cc_json;") - tk.MustExec("commit") -} - func TestIssue18850(t *testing.T) { store := testkit.CreateMockStore(t) diff --git a/session/BUILD.bazel b/session/BUILD.bazel index 5939684d51549..8e567503a6377 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -6,7 +6,6 @@ go_library( "advisory_locks.go", "bootstrap.go", "nontransactional.go", - "schema_amender.go", "session.go", "tidb.go", "txn.go", @@ -79,7 +78,6 @@ go_library( "//util/mathutil", "//util/memory", "//util/parser", - "//util/rowcodec", "//util/sem", "//util/sli", "//util/sqlexec", @@ -100,7 +98,6 @@ go_library( "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//txnkv/transaction", "@com_github_tikv_client_go_v2//util", "@io_etcd_go_etcd_client_v3//concurrency", "@org_uber_go_zap//:zap", @@ -118,7 +115,6 @@ go_test( "index_usage_sync_lease_test.go", "main_test.go", "nontransactional_test.go", - "schema_amender_test.go", "schema_test.go", "session_test.go", "tidb_test.go", @@ -139,20 +135,16 @@ go_test( "//expression", "//kv", "//meta", - "//meta/autoid", "//parser/ast", "//parser/auth", "//parser/model", - "//parser/mysql", "//parser/terror", "//planner/core", "//server", "//sessionctx", "//sessionctx/variable", - "//sessiontxn", "//statistics", "//store/mockstore", - "//table", "//tablecodec", "//telemetry", "//testkit", @@ -166,17 +158,13 @@ go_test( "//util/chunk", "//util/collate", "//util/logutil", - "//util/rowcodec", "//util/sqlexec", "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//txnkv/transaction", "@com_github_tikv_client_go_v2//util", - "@org_golang_x_exp//slices", "@org_uber_go_atomic//:atomic", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/session/schema_amender.go b/session/schema_amender.go deleted file mode 100644 index caaec7994ae3a..0000000000000 --- a/session/schema_amender.go +++ /dev/null @@ -1,713 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package session - -import ( - "bytes" - "context" - "encoding/hex" - "fmt" - "reflect" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/executor" - "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/table/tables" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/tikv/client-go/v2/tikv" - "github.com/tikv/client-go/v2/txnkv/transaction" - "go.uber.org/zap" -) - -const amendableType = nonMemAmendType | memBufAmendType -const nonMemAmendType = (1 << model.ActionAddColumn) | (1 << model.ActionDropColumn) | (1 << model.ActionDropIndex) -const memBufAmendType = uint64(1< 0 -} - -func needCollectIndexOps(actionType uint64) bool { - return actionType&(1< idxInfoAtStart.Meta().State { - amendOpType = ConstOpAddIndex[idxInfoAtStart.Meta().State][idxInfoAtCommit.Meta().State] - } - if amendOpType != AmendNone { - opInfo := &amendOperationAddIndexInfo{} - opInfo.AmendOpType = amendOpType - opInfo.tblInfoAtStart = tblAtStart - opInfo.tblInfoAtCommit = tblAtCommit - opInfo.indexInfoAtStart = idxInfoAtStart - opInfo.indexInfoAtCommit = idxInfoAtCommit - for _, idxCol := range idxInfoAtCommit.Meta().Columns { - colID := tblAtCommit.Meta().Columns[idxCol.Offset].ID - oldColInfo := findColByID(tblAtStart, colID) - // TODO: now index column MUST be found in old table columns, generated column is not supported. - if oldColInfo == nil || oldColInfo.IsGenerated() || oldColInfo.Hidden { - return nil, errors.Trace(errors.Errorf("amend index column=%v id=%v is not found or generated in table=%v", - idxCol.Name, colID, tblAtCommit.Meta().Name.String())) - } - opInfo.relatedOldIdxCols = append(opInfo.relatedOldIdxCols, oldColInfo) - } - opInfo.schemaAndDecoder = newSchemaAndDecoder(sctx, tblAtStart.Meta()) - fieldTypes := make([]*types.FieldType, 0, len(tblAtStart.Meta().Columns)) - for _, col := range tblAtStart.Meta().Columns { - fieldTypes = append(fieldTypes, &(col.FieldType)) - } - opInfo.chk = chunk.NewChunkWithCapacity(fieldTypes, 4) - addNewIndexOp := &amendOperationAddIndex{ - info: opInfo, - insertedNewIndexKeys: make(map[string]struct{}), - deletedOldIndexKeys: make(map[string]struct{}), - } - res = append(res, addNewIndexOp) - } - } - return res, nil -} - -// collectTblAmendOps collects amend operations for each table using the schema diff between startTS and commitTS. -func (a *amendCollector) collectTblAmendOps(sctx sessionctx.Context, phyTblID int64, - tblInfoAtStart, tblInfoAtCommit table.Table, actionType uint64) error { - if _, ok := a.tblAmendOpMap[phyTblID]; !ok { - a.tblAmendOpMap[phyTblID] = make([]amendOp, 0, 4) - } - if needCollectModifyColOps(actionType) { - _, err := a.collectModifyColAmendOps(tblInfoAtStart, tblInfoAtCommit) - if err != nil { - return err - } - } - if needCollectIndexOps(actionType) { - // TODO: currently only "add index" is considered. - ops, err := a.collectIndexAmendOps(sctx, tblInfoAtStart, tblInfoAtCommit) - if err != nil { - return err - } - a.tblAmendOpMap[phyTblID] = append(a.tblAmendOpMap[phyTblID], ops...) - } - return nil -} - -// mayGenDelIndexRowKeyOp returns if the row key op could generate Op_Del index key mutations. -func mayGenDelIndexRowKeyOp(keyOp kvrpcpb.Op) bool { - return keyOp == kvrpcpb.Op_Del || keyOp == kvrpcpb.Op_Put -} - -// mayGenPutIndexRowKeyOp returns if the row key op could generate Op_Put/Op_Insert index key mutations. -func mayGenPutIndexRowKeyOp(keyOp kvrpcpb.Op) bool { - return keyOp == kvrpcpb.Op_Put || keyOp == kvrpcpb.Op_Insert -} - -// amendOp is an amend operation for a specific schema change, new mutations will be generated using input ones. -type amendOp interface { - genMutations(ctx context.Context, sctx sessionctx.Context, commitMutations transaction.CommitterMutations, kvMap *rowKvMap, - resultMutations *transaction.PlainMutations) error -} - -// amendOperationAddIndex represents one amend operation related to a specific add index change. -type amendOperationAddIndexInfo struct { - AmendOpType int - tblInfoAtStart table.Table - tblInfoAtCommit table.Table - indexInfoAtStart table.Index - indexInfoAtCommit table.Index - relatedOldIdxCols []*table.Column - - schemaAndDecoder *schemaAndDecoder - chk *chunk.Chunk -} - -// amendOperationAddIndex represents the add operation will be performed on new key values for add index amend. -type amendOperationAddIndex struct { - info *amendOperationAddIndexInfo - - // insertedNewIndexKeys is used to check duplicates for new index generated by unique key. - insertedNewIndexKeys map[string]struct{} - // deletedOldIndexKeys is used to check duplicates for deleted old index keys. - deletedOldIndexKeys map[string]struct{} -} - -func (a *amendOperationAddIndexInfo) String() string { - var colStr string - colStr += "[" - for _, colInfo := range a.relatedOldIdxCols { - colStr += fmt.Sprintf(" %s ", colInfo.Name) - } - colStr += "]" - res := fmt.Sprintf("AmenedOpType=%d phyTblID=%d idxID=%d columns=%v", a.AmendOpType, a.indexInfoAtCommit.Meta().ID, - a.indexInfoAtCommit.Meta().ID, colStr) - return res -} - -func (a *amendOperationAddIndex) genMutations(ctx context.Context, sctx sessionctx.Context, commitMutations transaction.CommitterMutations, - kvMap *rowKvMap, resAddMutations *transaction.PlainMutations) error { - // There should be no duplicate keys in deletedOldIndexKeys and insertedNewIndexKeys. - deletedMutations := transaction.NewPlainMutations(32) - insertedMutations := transaction.NewPlainMutations(32) - for i, key := range commitMutations.GetKeys() { - if tablecodec.IsIndexKey(key) || tablecodec.DecodeTableID(key) != a.info.tblInfoAtCommit.Meta().ID { - continue - } - var newIdxMutation *transaction.PlainMutation - var oldIdxMutation *transaction.PlainMutation - var err error - keyOp := commitMutations.GetOp(i) - if addIndexNeedRemoveOp(a.info.AmendOpType) { - if mayGenDelIndexRowKeyOp(keyOp) { - oldIdxMutation, err = a.genOldIdxKey(ctx, sctx, key, kvMap.oldRowKvMap) - if err != nil { - return err - } - } - } - if addIndexNeedAddOp(a.info.AmendOpType) { - if mayGenPutIndexRowKeyOp(keyOp) { - newIdxMutation, err = a.genNewIdxKey(ctx, sctx, key, kvMap.newRowKvMap) - if err != nil { - return err - } - } - } - skipMerge := false - if a.info.AmendOpType == AmendNeedAddDeleteAndInsert { - // If the old index key is the same with new index key, then the index related row value - // is not changed in this row, we don't need to add or remove index keys for this row. - if oldIdxMutation != nil && newIdxMutation != nil { - if bytes.Equal(oldIdxMutation.Key, newIdxMutation.Key) { - skipMerge = true - } - } - } - if !skipMerge { - if oldIdxMutation != nil { - deletedMutations.AppendMutation(*oldIdxMutation) - } - if newIdxMutation != nil { - insertedMutations.AppendMutation(*newIdxMutation) - } - } - } - // For unique index, there may be conflicts on the same unique index key from different rows.Consider a update statement, - // "Op_Del" on row_key = 3, row_val = 4, the "Op_Del" unique_key_4 -> nil will be generated. - // "Op_Put" on row_key = 0, row_val = 4, the "Op_Insert" unique_key_4 -> 0 will be generated. - // The "Op_Insert" should cover the "Op_Del" otherwise the new put row value will not have a correspond index value. - if a.info.indexInfoAtCommit.Meta().Unique { - for i := 0; i < len(deletedMutations.GetKeys()); i++ { - key := deletedMutations.GetKeys()[i] - if _, ok := a.insertedNewIndexKeys[string(key)]; !ok { - resAddMutations.Push(deletedMutations.GetOps()[i], key, deletedMutations.GetValues()[i], deletedMutations.IsPessimisticLock(i), - deletedMutations.IsAssertExists(i), deletedMutations.IsAssertNotExist(i), deletedMutations.NeedConstraintCheckInPrewrite(i)) - } - } - for i := 0; i < len(insertedMutations.GetKeys()); i++ { - key := insertedMutations.GetKeys()[i] - destKeyOp := kvrpcpb.Op_Insert - if _, ok := a.deletedOldIndexKeys[string(key)]; ok { - destKeyOp = kvrpcpb.Op_Put - } - resAddMutations.Push(destKeyOp, key, insertedMutations.GetValues()[i], insertedMutations.IsPessimisticLock(i), - insertedMutations.IsAssertExists(i), insertedMutations.IsAssertNotExist(i), insertedMutations.NeedConstraintCheckInPrewrite(i)) - } - } else { - resAddMutations.MergeMutations(deletedMutations) - resAddMutations.MergeMutations(insertedMutations) - } - return nil -} - -func getCommonHandleDatum(tbl table.Table, row chunk.Row) []types.Datum { - if !tbl.Meta().IsCommonHandle { - return nil - } - datumBuf := make([]types.Datum, 0, 4) - for _, col := range tbl.Cols() { - if mysql.HasPriKeyFlag(col.GetFlag()) { - datumBuf = append(datumBuf, row.GetDatum(col.Offset, &col.FieldType)) - } - } - return datumBuf -} - -func (a *amendOperationAddIndexInfo) genIndexKeyValue(ctx context.Context, sctx sessionctx.Context, kvMap map[string][]byte, - key []byte, kvHandle kv.Handle, keyOnly bool) ([]byte, []byte, error) { - chk := a.chk - chk.Reset() - val, ok := kvMap[string(key)] - if !ok { - // The Op_Put may not exist in old value kv map. - if keyOnly { - return nil, nil, nil - } - return nil, nil, errors.Errorf("key=%v is not found in new row kv map", kv.Key(key).String()) - } - err := executor.DecodeRowValToChunk(sctx, a.schemaAndDecoder.schema, a.tblInfoAtStart.Meta(), kvHandle, val, chk, a.schemaAndDecoder.decoder) - if err != nil { - logutil.Logger(ctx).Warn("amend decode value to chunk failed", zap.Error(err)) - return nil, nil, errors.Trace(err) - } - idxVals := make([]types.Datum, 0, len(a.indexInfoAtCommit.Meta().Columns)) - for _, oldCol := range a.relatedOldIdxCols { - idxVals = append(idxVals, chk.GetRow(0).GetDatum(oldCol.Offset, &oldCol.FieldType)) - } - - rsData := tables.TryGetHandleRestoredDataWrapper(a.tblInfoAtCommit.Meta(), getCommonHandleDatum(a.tblInfoAtCommit, chk.GetRow(0)), nil, a.indexInfoAtCommit.Meta()) - - // Generate index key buf. - newIdxKey, distinct, err := tablecodec.GenIndexKey(sctx.GetSessionVars().StmtCtx, - a.tblInfoAtCommit.Meta(), a.indexInfoAtCommit.Meta(), a.tblInfoAtCommit.Meta().ID, idxVals, kvHandle, nil) - if err != nil { - logutil.Logger(ctx).Warn("amend generate index key failed", zap.Error(err)) - return nil, nil, errors.Trace(err) - } - if keyOnly { - return newIdxKey, []byte{}, nil - } - - // Generate index value buf. - needRsData := tables.NeedRestoredData(a.indexInfoAtCommit.Meta().Columns, a.tblInfoAtCommit.Meta().Columns) - newIdxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, a.tblInfoAtCommit.Meta(), a.indexInfoAtCommit.Meta(), needRsData, distinct, false, idxVals, kvHandle, 0, rsData) - if err != nil { - logutil.Logger(ctx).Warn("amend generate index values failed", zap.Error(err)) - return nil, nil, errors.Trace(err) - } - return newIdxKey, newIdxVal, nil -} - -func (a *amendOperationAddIndex) genNewIdxKey(ctx context.Context, sctx sessionctx.Context, key []byte, - kvMap map[string][]byte) (*transaction.PlainMutation, error) { - kvHandle, err := tablecodec.DecodeRowKey(key) - if err != nil { - logutil.Logger(ctx).Error("decode key error", zap.String("key", hex.EncodeToString(key)), zap.Error(err)) - return nil, errors.Trace(err) - } - - newIdxKey, newIdxValue, err := a.info.genIndexKeyValue(ctx, sctx, kvMap, key, kvHandle, false) - if err != nil { - return nil, errors.Trace(err) - } - newIndexOp := kvrpcpb.Op_Put - isPessimisticLock := false - if _, ok := a.insertedNewIndexKeys[string(newIdxKey)]; ok { - return nil, errors.Trace(errors.Errorf("amend process key same key=%v found for index=%v in table=%v", - newIdxKey, a.info.indexInfoAtCommit.Meta().Name, a.info.tblInfoAtCommit.Meta().Name)) - } - if a.info.indexInfoAtCommit.Meta().Unique { - newIndexOp = kvrpcpb.Op_Insert - isPessimisticLock = true - } - a.insertedNewIndexKeys[string(newIdxKey)] = struct{}{} - var flags transaction.CommitterMutationFlags - if isPessimisticLock { - flags |= transaction.MutationFlagIsPessimisticLock - } - newMutation := &transaction.PlainMutation{KeyOp: newIndexOp, Key: newIdxKey, Value: newIdxValue, Flags: flags} - return newMutation, nil -} - -func (a *amendOperationAddIndex) genOldIdxKey(ctx context.Context, sctx sessionctx.Context, key []byte, - oldValKvMap map[string][]byte) (*transaction.PlainMutation, error) { - kvHandle, err := tablecodec.DecodeRowKey(key) - if err != nil { - logutil.Logger(ctx).Error("decode key error", zap.String("key", hex.EncodeToString(key)), zap.Error(err)) - return nil, errors.Trace(err) - } - // Generated delete index key value. - newIdxKey, emptyVal, err := a.info.genIndexKeyValue(ctx, sctx, oldValKvMap, key, kvHandle, true) - if err != nil { - return nil, errors.Trace(err) - } - // For Op_Put the key may not exist in old key value map. - if len(newIdxKey) > 0 { - isPessimisticLock := false - if _, ok := a.deletedOldIndexKeys[string(newIdxKey)]; ok { - return nil, errors.Trace(errors.Errorf("amend process key same key=%v found for index=%v in table=%v", - newIdxKey, a.info.indexInfoAtCommit.Meta().Name, a.info.tblInfoAtCommit.Meta().Name)) - } - if a.info.indexInfoAtCommit.Meta().Unique { - isPessimisticLock = true - } - a.deletedOldIndexKeys[string(newIdxKey)] = struct{}{} - var flags transaction.CommitterMutationFlags - if isPessimisticLock { - flags |= transaction.MutationFlagIsPessimisticLock - } - return &transaction.PlainMutation{KeyOp: kvrpcpb.Op_Del, Key: newIdxKey, Value: emptyVal, Flags: flags}, nil - } - return nil, nil -} - -// SchemaAmender is used to amend pessimistic transactions for schema change. -type SchemaAmender struct { - sess *session -} - -// NewSchemaAmenderForTikvTxn creates a schema amender for tikvTxn type. -func NewSchemaAmenderForTikvTxn(sess *session) *SchemaAmender { - amender := &SchemaAmender{sess: sess} - return amender -} - -func (s *SchemaAmender) getAmendableKeys(commitMutations transaction.CommitterMutations, info *amendCollector) ([]kv.Key, []kv.Key) { - addKeys := make([]kv.Key, 0, len(commitMutations.GetKeys())) - removeKeys := make([]kv.Key, 0, len(commitMutations.GetKeys())) - for i, byteKey := range commitMutations.GetKeys() { - if tablecodec.IsIndexKey(byteKey) || !info.keyHasAmendOp(byteKey) { - continue - } - keyOp := commitMutations.GetOp(i) - switch keyOp { - case kvrpcpb.Op_Put: - addKeys = append(addKeys, byteKey) - removeKeys = append(removeKeys, byteKey) - case kvrpcpb.Op_Insert: - addKeys = append(addKeys, byteKey) - case kvrpcpb.Op_Del: - removeKeys = append(removeKeys, byteKey) - } - } - return addKeys, removeKeys -} - -type rowKvMap struct { - oldRowKvMap map[string][]byte - newRowKvMap map[string][]byte -} - -func (s *SchemaAmender) prepareKvMap(ctx context.Context, commitMutations transaction.CommitterMutations, info *amendCollector) (*rowKvMap, error) { - // Get keys need to be considered for the amend operation, currently only row keys. - addKeys, removeKeys := s.getAmendableKeys(commitMutations, info) - - // BatchGet the new key values, the Op_Put and Op_Insert type keys in memory buffer. - txn, err := s.sess.Txn(true) - if err != nil { - return nil, errors.Trace(err) - } - newValKvMap, err := txn.BatchGet(ctx, addKeys) - if err != nil { - logutil.Logger(ctx).Warn("amend failed to batch get kv new keys", zap.Error(err)) - return nil, errors.Trace(err) - } - if len(newValKvMap) != len(addKeys) { - logutil.Logger(ctx).Error("amend failed to batch get results invalid", - zap.Int("addKeys len", len(addKeys)), zap.Int("newValKvMap", len(newValKvMap))) - return nil, errors.Errorf("add keys has %v values but result kvMap has %v", len(addKeys), len(newValKvMap)) - } - // BatchGet the old key values, the Op_Del and Op_Put types keys in storage using forUpdateTS, the Op_put type is for - // row update using the same row key, it may not exist. - snapshot := s.sess.GetStore().GetSnapshot(kv.Version{Ver: s.sess.sessionVars.TxnCtx.GetForUpdateTS()}) - oldValKvMap, err := snapshot.BatchGet(ctx, removeKeys) - if err != nil { - logutil.Logger(ctx).Warn("amend failed to batch get kv old keys", zap.Error(err)) - return nil, errors.Trace(err) - } - - res := &rowKvMap{ - oldRowKvMap: oldValKvMap, - newRowKvMap: newValKvMap, - } - return res, nil -} - -func (s *SchemaAmender) checkDupKeys(ctx context.Context, mutations transaction.CommitterMutations) error { - // Check if there are duplicate key entries. - checkMap := make(map[string]kvrpcpb.Op) - for i := 0; i < mutations.Len(); i++ { - key := mutations.GetKey(i) - keyOp := mutations.GetOp(i) - keyVal := mutations.GetValue(i) - if foundOp, ok := checkMap[string(key)]; ok { - logutil.Logger(ctx).Error("duplicate key found in amend result mutations", - zap.Stringer("key", kv.Key(key)), - zap.Stringer("foundKeyOp", foundOp), - zap.Stringer("thisKeyOp", keyOp), - zap.Stringer("thisKeyValue", kv.Key(keyVal))) - return errors.Trace(errors.Errorf("duplicate key=%s is found in mutations", kv.Key(key).String())) - } - checkMap[string(key)] = keyOp - } - return nil -} - -// genAllAmendMutations generates CommitterMutations for all tables and related amend operations. -func (s *SchemaAmender) genAllAmendMutations(ctx context.Context, commitMutations transaction.CommitterMutations, - info *amendCollector) (*transaction.PlainMutations, error) { - rowKvMap, err := s.prepareKvMap(ctx, commitMutations, info) - if err != nil { - return nil, err - } - // Do generate add/remove mutations processing each key. - resultNewMutations := transaction.NewPlainMutations(32) - for _, amendOps := range info.tblAmendOpMap { - for _, curOp := range amendOps { - err := curOp.genMutations(ctx, s.sess, commitMutations, rowKvMap, &resultNewMutations) - if err != nil { - return nil, err - } - } - } - err = s.checkDupKeys(ctx, &resultNewMutations) - if err != nil { - return nil, err - } - return &resultNewMutations, nil -} - -// AmendTxn does check and generate amend mutations based on input infoSchema and mutations, mutations need to prewrite -// are returned, the input commitMutations will not be changed. -func (s *SchemaAmender) AmendTxn(ctx context.Context, startInfoSchema tikv.SchemaVer, change *transaction.RelatedSchemaChange, - commitMutations transaction.CommitterMutations) (transaction.CommitterMutations, error) { - // Get info schema meta - infoSchemaAtStart := startInfoSchema.(infoschema.InfoSchema) - infoSchemaAtCheck := change.LatestInfoSchema.(infoschema.InfoSchema) - - // Collect amend operations for each table by physical table ID. - var needAmendMem bool - amendCollector := newAmendCollector() - for i, tblID := range change.PhyTblIDS { - actionType := change.ActionTypes[i] - // Check amendable flags, return if not supported flags exist. - if actionType&(^amendableType) != 0 { - logutil.Logger(ctx).Info("amend action type not supported for txn", zap.Int64("tblID", tblID), zap.Uint64("actionType", actionType)) - return nil, errors.Trace(table.ErrUnsupportedOp) - } - // Partition table is not supported now. - tblInfoAtStart, ok := infoSchemaAtStart.TableByID(tblID) - if !ok { - return nil, errors.Trace(errors.Errorf("tableID=%d is not found in infoSchema", tblID)) - } - if tblInfoAtStart.Meta().Partition != nil { - logutil.Logger(ctx).Info("Amend for partition table is not supported", - zap.String("tableName", tblInfoAtStart.Meta().Name.String()), zap.Int64("tableID", tblID)) - return nil, errors.Trace(table.ErrUnsupportedOp) - } - tblInfoAtCommit, ok := infoSchemaAtCheck.TableByID(tblID) - if !ok { - return nil, errors.Trace(errors.Errorf("tableID=%d is not found in infoSchema", tblID)) - } - if actionType&(memBufAmendType) != 0 { - needAmendMem = true - err := amendCollector.collectTblAmendOps(s.sess, tblID, tblInfoAtStart, tblInfoAtCommit, actionType) - if err != nil { - return nil, err - } - } - } - // After amend operations collect, generate related new mutations based on input commitMutations - if needAmendMem { - return s.genAllAmendMutations(ctx, commitMutations, amendCollector) - } - return nil, nil -} - -func newSchemaAndDecoder(ctx sessionctx.Context, tbl *model.TableInfo) *schemaAndDecoder { - schema := expression.NewSchema(make([]*expression.Column, 0, len(tbl.Columns))...) - for _, col := range tbl.Columns { - colExpr := &expression.Column{ - RetType: &col.FieldType, - ID: col.ID, - } - if col.IsGenerated() && !col.GeneratedStored { - // This will not be used since generated column is rejected in collectIndexAmendOps. - colExpr.VirtualExpr = &expression.Constant{} - } - schema.Append(colExpr) - } - return &schemaAndDecoder{schema, executor.NewRowDecoder(ctx, schema, tbl)} -} diff --git a/session/schema_amender_test.go b/session/schema_amender_test.go deleted file mode 100644 index 565b4861a9400..0000000000000 --- a/session/schema_amender_test.go +++ /dev/null @@ -1,477 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package session - -import ( - "bytes" - "context" - "fmt" - "strconv" - "testing" - - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/txnkv/transaction" - "go.uber.org/zap" - "golang.org/x/exp/slices" -) - -func initTblColIdxID(metaInfo *model.TableInfo) { - for i, col := range metaInfo.Columns { - col.ID = int64(i + 1) - } - for i, idx := range metaInfo.Indices { - idx.ID = int64(i + 1) - if idx.Name.L == "f_g" { - idx.Unique = true - } else { - idx.Unique = false - } - } - metaInfo.ID = 1 - metaInfo.State = model.StatePublic -} - -func mutationsEqual(res *transaction.PlainMutations, expected *transaction.PlainMutations, t *testing.T) { - require.Len(t, res.GetKeys(), len(expected.GetKeys())) - for i := 0; i < len(res.GetKeys()); i++ { - foundIdx := -1 - for j := 0; j < len(expected.GetKeys()); j++ { - if bytes.Equal(res.GetKeys()[i], expected.GetKeys()[j]) { - foundIdx = j - break - } - } - require.GreaterOrEqual(t, foundIdx, 0) - require.Equal(t, expected.GetOps()[foundIdx], res.GetOps()[i]) - require.Equal(t, expected.IsPessimisticLock(foundIdx), res.IsPessimisticLock(i)) - require.Equal(t, expected.GetKeys()[foundIdx], res.GetKeys()[i]) - require.Equal(t, expected.GetValues()[foundIdx], res.GetValues()[i]) - } -} - -type data struct { - ops []kvrpcpb.Op - keys [][]byte - values [][]byte - rowValue [][]types.Datum -} - -// Generate exist old data and new data in transaction to be amended. Also generate the expected amend mutations -// according to the old and new data and the full generated expected mutations. -func prepareTestData( - se *session, - mutations *transaction.PlainMutations, - oldTblInfo table.Table, - newTblInfo table.Table, - expectedAmendOps []amendOp, - t *testing.T, -) (*data, transaction.PlainMutations) { - var err error - // Generated test data. - colIds := make([]int64, len(oldTblInfo.Meta().Columns)) - basicRowValue := make([]types.Datum, len(oldTblInfo.Meta().Columns)) - for i, col := range oldTblInfo.Meta().Columns { - colIds[i] = oldTblInfo.Meta().Columns[col.Offset].ID - if col.FieldType.GetType() == mysql.TypeLong { - basicRowValue[i] = types.NewIntDatum(int64(col.Offset)) - } else { - basicRowValue[i] = types.NewStringDatum(strconv.Itoa(col.Offset)) - } - } - KeyOps := []kvrpcpb.Op{kvrpcpb.Op_Put, kvrpcpb.Op_Del, kvrpcpb.Op_Lock, kvrpcpb.Op_Insert, kvrpcpb.Op_Put, - kvrpcpb.Op_Del, kvrpcpb.Op_Insert, kvrpcpb.Op_Lock} - numberOfRows := len(KeyOps) - oldRowValues := make([][]types.Datum, numberOfRows) - newRowValues := make([][]types.Datum, numberOfRows) - rd := rowcodec.Encoder{Enable: true} - oldData := &data{} - expectedMutations := transaction.NewPlainMutations(8) - oldRowKvMap := make(map[string][]types.Datum) - newRowKvMap := make(map[string][]types.Datum) - - // colIdx: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9. - // column: a, b, c, d, e, c_str, d_str, e_str, f, g. - // Generate old data. - for i := 0; i < numberOfRows; i++ { - keyOp := KeyOps[i] - thisRowValue := make([]types.Datum, len(basicRowValue)) - copy(thisRowValue, basicRowValue) - thisRowValue[0] = types.NewIntDatum(int64(i + 1)) - thisRowValue[4] = types.NewIntDatum(int64(i + 1 + 4)) - // f_g has a unique index. - thisRowValue[8] = types.NewIntDatum(int64(i + 1 + 8)) - - // Save old data, they will be put into db first. - rowKey := tablecodec.EncodeRowKeyWithHandle(oldTblInfo.Meta().ID, kv.IntHandle(int64(i+1))) - var rowValue []byte - rowValue, err = rd.Encode(se.sessionVars.StmtCtx, colIds, thisRowValue, nil) - require.NoError(t, err) - if keyOp == kvrpcpb.Op_Del || keyOp == kvrpcpb.Op_Put || keyOp == kvrpcpb.Op_Lock { - // Skip the last Op_put, it has no old row value. - if i == 4 { - oldRowValues[i] = nil - continue - } - oldData.keys = append(oldData.keys, rowKey) - oldData.values = append(oldData.values, rowValue) - oldData.ops = append(oldData.ops, keyOp) - oldData.rowValue = append(oldData.rowValue, thisRowValue) - if keyOp == kvrpcpb.Op_Del { - mutations.Push(keyOp, rowKey, []byte{}, true, false, false, false) - } - } - oldRowValues[i] = thisRowValue - oldRowKvMap[string(rowKey)] = thisRowValue - } - - // Generate new data. - for i := 0; i < numberOfRows; i++ { - keyOp := KeyOps[i] - thisRowValue := make([]types.Datum, len(basicRowValue)) - copy(thisRowValue, basicRowValue) - thisRowValue[0] = types.NewIntDatum(int64(i + 1)) - // New column e value should be different from old row values. - thisRowValue[4] = types.NewIntDatum(int64(i+1+4) * 20) - // New column f value should be different since it has a related unique index. - thisRowValue[8] = types.NewIntDatum(int64(i+1+4) * 20) - - var rowValue []byte - // Save new data. - rowKey := tablecodec.EncodeRowKeyWithHandle(oldTblInfo.Meta().ID, kv.IntHandle(int64(i+1))) - if keyOp == kvrpcpb.Op_Insert { - rowValue, err = tablecodec.EncodeOldRow(se.sessionVars.StmtCtx, thisRowValue, colIds, nil, nil) - } else { - rowValue, err = rd.Encode(se.sessionVars.StmtCtx, colIds, thisRowValue, nil) - } - require.NoError(t, err) - if keyOp == kvrpcpb.Op_Put || keyOp == kvrpcpb.Op_Insert { - mutations.Push(keyOp, rowKey, rowValue, true, false, false, false) - } else if keyOp == kvrpcpb.Op_Lock { - mutations.Push(keyOp, rowKey, []byte{}, true, false, false, false) - } - newRowValues[i] = thisRowValue - newRowKvMap[string(rowKey)] = thisRowValue - } - - // Prepare expected result mutations. - for _, op := range expectedAmendOps { - var info *amendOperationAddIndexInfo - expectedOp, ok := op.(*amendOperationAddIndex) - require.True(t, ok) - info = expectedOp.info - var idxVal []byte - genIndexKV := func(inputRow []types.Datum) ([]byte, []byte) { - indexDatums := make([]types.Datum, len(info.relatedOldIdxCols)) - for colIdx, col := range info.relatedOldIdxCols { - indexDatums[colIdx] = inputRow[col.Offset] - } - kvHandle := kv.IntHandle(inputRow[0].GetInt64()) - idxKey, _, err := tablecodec.GenIndexKey(se.sessionVars.StmtCtx, newTblInfo.Meta(), - info.indexInfoAtCommit.Meta(), newTblInfo.Meta().ID, indexDatums, kvHandle, nil) - require.NoError(t, err) - idxVal, err = tablecodec.GenIndexValuePortal(se.sessionVars.StmtCtx, newTblInfo.Meta(), info.indexInfoAtCommit.Meta(), false, info.indexInfoAtCommit.Meta().Unique, false, indexDatums, kvHandle, 0, nil) - require.NoError(t, err) - return idxKey, idxVal - } - for i := 0; i < len(mutations.GetKeys()); i++ { - oldIdxKeyMutation := transaction.PlainMutations{} - newIdxKeyMutation := transaction.PlainMutations{} - key := mutations.GetKeys()[i] - keyOp := mutations.GetOps()[i] - if addIndexNeedRemoveOp(info.AmendOpType) && mayGenDelIndexRowKeyOp(keyOp) { - thisRowValue := oldRowKvMap[string(key)] - if len(thisRowValue) > 0 { - idxKey, _ := genIndexKV(thisRowValue) - isPessimisticLock := false - if info.indexInfoAtCommit.Meta().Unique { - isPessimisticLock = true - } - oldIdxKeyMutation.Push(kvrpcpb.Op_Del, idxKey, []byte{}, isPessimisticLock, false, false, false) - } - } - if addIndexNeedAddOp(info.AmendOpType) && mayGenPutIndexRowKeyOp(keyOp) { - thisRowValue := newRowKvMap[string(key)] - idxKey, idxVal := genIndexKV(thisRowValue) - mutOp := kvrpcpb.Op_Put - isPessimisticLock := false - if info.indexInfoAtCommit.Meta().Unique { - mutOp = kvrpcpb.Op_Insert - isPessimisticLock = true - } - newIdxKeyMutation.Push(mutOp, idxKey, idxVal, isPessimisticLock, false, false, false) - } - skipMerge := false - if info.AmendOpType == AmendNeedAddDeleteAndInsert { - if len(oldIdxKeyMutation.GetKeys()) > 0 && len(newIdxKeyMutation.GetKeys()) > 0 { - if bytes.Equal(oldIdxKeyMutation.GetKeys()[0], newIdxKeyMutation.GetKeys()[0]) { - skipMerge = true - } - } - } - if !skipMerge { - if len(oldIdxKeyMutation.GetKeys()) > 0 { - expectedMutations.MergeMutations(oldIdxKeyMutation) - } - if len(newIdxKeyMutation.GetKeys()) > 0 { - expectedMutations.MergeMutations(newIdxKeyMutation) - } - } - } - } - - return oldData, expectedMutations -} - -func TestAmendCollectAndGenMutations(t *testing.T) { - ctx := context.Background() - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { require.NoError(t, store.Close()) }() - se := &session{ - store: store, - sessionVars: variable.NewSessionVars(nil), - } - se.mu.values = make(map[fmt.Stringer]interface{}) - domain.BindDomain(se, domain.NewMockDomain()) - startStates := []model.SchemaState{model.StateNone, model.StateDeleteOnly, model.StateWriteOnly, model.StateWriteReorganization} - for _, startState := range startStates { - endStatMap := ConstOpAddIndex[startState] - var endStates []model.SchemaState - for st := range endStatMap { - endStates = append(endStates, st) - } - slices.Sort(endStates) - for _, endState := range endStates { - logutil.BgLogger().Info("[TEST]>>>>>>new round test", zap.Stringer("start", startState), zap.Stringer("end", endState)) - // column: a, b, c, d, e, c_str, d_str, e_str, f, g. - // PK: a. - // indices: c_d_e, e, f, g, f_g, c_d_e_str, c_d_e_str_prefix. - oldTblMeta := core.MockSignedTable() - initTblColIdxID(oldTblMeta) - // Indices[0] does not exist at the start. - oldTblMeta.Indices = oldTblMeta.Indices[1:] - oldTbInfo, err := table.TableFromMeta(autoid.NewAllocators(false), oldTblMeta) - require.NoError(t, err) - oldTblMeta.Indices[0].State = startState - oldTblMeta.Indices[2].State = endState - oldTblMeta.Indices[3].State = startState - - newTblMeta := core.MockSignedTable() - initTblColIdxID(newTblMeta) - // colh is newly added. - colh := &model.ColumnInfo{ - State: model.StatePublic, - Offset: 12, - Name: model.NewCIStr("b"), - FieldType: *(types.NewFieldType(mysql.TypeLong)), - ID: 13, - } - newTblMeta.Columns = append(newTblMeta.Columns, colh) - // The last index "c_d_e_str_prefix is dropped. - newTblMeta.Indices = newTblMeta.Indices[:len(newTblMeta.Indices)-1] - newTblMeta.Indices[0].Unique = false - newTblInfo, err := table.TableFromMeta(autoid.NewAllocators(false), newTblMeta) - require.NoError(t, err) - newTblMeta.Indices[0].State = endState - // Indices[1] is newly created. - newTblMeta.Indices[1].State = endState - // Indices[3] is dropped - newTblMeta.Indices[3].State = startState - // Indices[4] is newly created unique index. - newTblMeta.Indices[4].State = endState - - // Only the add index amend operations is collected in the results. - collector := newAmendCollector() - tblID := int64(1) - err = collector.collectTblAmendOps(se, tblID, oldTbInfo, newTblInfo, 1< 10", "2 20")) - - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1 values(5, 5, 5)") - tk2.MustExec("alter table t1 drop column c3") - tk2.MustExec("alter table t1 drop column c2") - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("1", "2", "5")) -} - -func TestPessimisticTxnWithDDLChangeColumn(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 varchar(10))") - tk.MustExec("insert t1 values (1, 77, 'a'), (2, 88, 'b')") - - // Extend column field length is acceptable. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk.MustExec("begin pessimistic") - tk.MustExec("update t1 set c2 = c1 * 10") - tk2.MustExec("alter table t1 modify column c2 bigint") - tk.MustExec("commit") - tk.MustExec("begin pessimistic") - tk.MustExec("update t1 set c3 = 'aba'") - tk2.MustExec("alter table t1 modify column c3 varchar(30)") - tk.MustExec("commit") - tk2.MustExec("admin check table t1") - tk.MustQuery("select * from t1").Check(testkit.Rows("1 10 aba", "2 20 aba")) - - // Change column from nullable to not null is not allowed by now. - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1(c1) values(100)") - tk2.MustExec("alter table t1 change column c2 cc2 bigint not null") - require.Error(t, tk.ExecToErr("commit")) - - // Change default value is rejected. - tk2.MustExec("create table ta(a bigint primary key auto_random(3), b varchar(255) default 'old');") - tk2.MustExec("insert into ta(b) values('a')") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into ta values()") - tk2.MustExec("alter table ta modify column b varchar(300) default 'new';") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustQuery("select b from ta").Check(testkit.Rows("a")) - - // Change default value with add index. There is a new MultipleKeyFlag flag on the index key, and the column is changed, - // the flag check will fail. - tk2.MustExec("insert into ta values()") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into ta(b) values('inserted_value')") - tk.MustExec("insert into ta values()") - tk.MustExec("insert into ta values()") - tk2.MustExec("alter table ta add index i1(b)") - tk2.MustExec("alter table ta change column b b varchar(301) default 'newest'") - tk2.MustExec("alter table ta modify column b varchar(301) default 'new'") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table ta") - tk2.MustQuery("select count(b) from ta use index(i1) where b = 'new'").Check(testkit.Rows("1")) - - // Change default value to now(). - tk2.MustExec("create table tbl_time(c1 int, c_time timestamp)") - tk2.MustExec("insert into tbl_time(c1) values(1)") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into tbl_time(c1) values(2)") - tk2.MustExec("alter table tbl_time modify column c_time timestamp default now()") - tk2.MustExec("insert into tbl_time(c1) values(3)") - tk2.MustExec("insert into tbl_time(c1) values(4)") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustQuery("select count(1) from tbl_time where c_time is not null").Check(testkit.Rows("2")) -} - func TestPessimisticUnionForUpdate(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) @@ -2136,56 +2035,6 @@ func TestInsertDupKeyAfterLockBatchPointGet(t *testing.T) { require.True(t, terror.ErrorEqual(err, kv.ErrKeyExists)) } -func TestAmendTxnVariable(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - tk3 := testkit.NewTestKit(t, store) - tk3.MustExec("use test") - - tk2.MustExec("drop table if exists t1") - tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));") - tk2.MustExec("insert into t1 values(1, 1, 1);") - tk2.MustExec("insert into t1 values(2, 2, 2);") - - // Set off the session variable. - tk3.MustExec("set tidb_enable_amend_pessimistic_txn = 0;") - tk3.MustExec("begin pessimistic") - tk3.MustExec("insert into t1 values(3, 3, 3)") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1 values(4, 4, 4)") - tk2.MustExec("alter table t1 add column new_col int") - require.Error(t, tk3.ExecToErr("commit")) - tk.MustExec("commit") - tk2.MustQuery("select * from t1").Check(testkit.Rows("1 1 1 ", "2 2 2 ", "4 4 4 ")) - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 0;") - - // Set off the global variable. - tk2.MustExec("set global tidb_enable_amend_pessimistic_txn = 0;") - - tk4 := testkit.NewTestKit(t, store) - tk4.MustExec("use test") - - tk4.MustQuery(`show variables like "tidb_enable_amend_pessimistic_txn"`).Check(testkit.Rows("tidb_enable_amend_pessimistic_txn OFF")) - tk4.MustExec("begin pessimistic") - tk4.MustExec("insert into t1 values(5, 5, 5, 5)") - tk2.MustExec("alter table t1 drop column new_col") - require.Error(t, tk4.ExecToErr("commit")) - tk4.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk4.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk4.MustExec("begin pessimistic") - tk4.MustExec("insert into t1 values(5, 5, 5)") - tk2.MustExec("alter table t1 add column new_col2 int") - tk4.MustExec("commit") - tk2.MustQuery("select * from t1").Check(testkit.Rows("1 1 1 ", "2 2 2 ", "4 4 4 ", "5 5 5 ")) -} - func TestSelectForUpdateWaitSeconds(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) @@ -2309,9 +2158,6 @@ func TestAsyncCommitWithSchemaChange(t *testing.T) { tk2 := createAsyncCommitTestKit(t, store) tk3 := createAsyncCommitTestKit(t, store) tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk3.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") // The txn tk writes something but with failpoint the primary key is not committed. tk.MustExec("begin pessimistic") @@ -2368,12 +2214,6 @@ func Test1PCWithSchemaChange(t *testing.T) { t.Skip("This test is unstable as depending on time.Sleep") } - defer config.RestoreFunc()() - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.AsyncCommit.SafeWindow = time.Second - conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 - }) - store := realtikvtest.CreateMockStoreAndSetup(t) tk := create1PCTestKit(t, store) @@ -2384,9 +2224,6 @@ func Test1PCWithSchemaChange(t *testing.T) { tk.MustExec("create table tk (c1 int primary key, c2 int)") tk.MustExec("insert into tk values (1, 1)") tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk3.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") tk.MustExec("begin pessimistic") tk.MustExec("insert into tk values(2, 2)") @@ -2424,305 +2261,6 @@ func Test1PCWithSchemaChange(t *testing.T) { tk3.MustExec("admin check table tk") } -func TestAmendForUniqueIndex(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk2.MustExec("use test") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - - tk2.MustExec("drop table if exists t1") - tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));") - tk2.MustExec("insert into t1 values(1, 1, 1);") - tk2.MustExec("insert into t1 values(2, 2, 2);") - - // New value has duplicates. - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1 values(3, 3, 3)") - tk.MustExec("insert into t1 values(4, 4, 3)") - tk2.MustExec("alter table t1 add unique index uk1(c3)") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("alter table t1 drop index uk1") - tk2.MustExec("admin check table t1") - - // New values has duplicates with old values. - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1 values(3, 3, 3)") - tk.MustExec("insert into t1 values(4, 4, 1)") - tk2.MustExec("alter table t1 add unique index uk1(c3)") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t1") - - // Put new values. - tk2.MustQuery("select * from t1 for update").Check(testkit.Rows("1 1 1", "2 2 2")) - tk2.MustExec("alter table t1 drop index uk1") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t1 add unique index uk1(c3)") - tk.MustExec("insert into t1 values(5, 5, 5)") - tk.MustExec("commit") - tk2.MustExec("admin check table t1") - - // Update the old value with same unique key value, should abort. - tk2.MustExec("drop table if exists t;") - tk2.MustExec("create table t (id int auto_increment primary key, c int);") - tk2.MustExec("insert into t (id, c) values (1, 2), (3, 4);") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t add unique index uk(c);") - tk.MustExec("update t set c = 2 where id = 3;") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t") - - // Update the old value with same unique key, but the row key has changed. - tk2.MustExec("drop table if exists t;") - tk2.MustExec("create table t (id int auto_increment primary key, c int);") - tk2.MustExec("insert into t (id, c) values (1, 2), (3, 4);") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t values (3, 2) on duplicate key update id = values(id) and c = values(c)") - finishCh := make(chan error) - go func() { - err := tk2.ExecToErr("alter table t add unique index uk(c);") - finishCh <- err - }() - time.Sleep(300 * time.Millisecond) - tk.MustExec("commit") - err := <-finishCh - require.NoError(t, err) - tk2.MustExec("admin check table t") - - // Update the old value with same unique key, but the row key has changed. - /* TODO this case could not pass using unistore because of https://github.com/ngaut/unistore/issues/428. - // Reopen it after fix the unistore issue. - tk2.MustExec("drop table if exists t;") - tk2.MustExec("create table t (id int auto_increment primary key, c int);") - tk2.MustExec("insert into t (id, c) values (1, 2), (3, 4);") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t add unique index uk(c);") - tk.MustExec("insert into t values (3, 2) on duplicate key update id = values(id) and c = values(c)") - tk.MustExec("commit") - tk2.MustExec("admin check table t") - */ - - // Test pessimistic retry for unique index amend. - tk2.MustExec("drop table if exists t;") - tk2.MustExec("create table t (id int key, c int);") - tk2.MustExec("insert into t (id, c) values (1, 1), (2, 2);") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t add unique index uk(c)") - tk.MustExec("insert into t values(3, 5)") - tk.MustExec("update t set c = 4 where c = 2") - errCh := make(chan error, 1) - go func() { - var err error - err = tk2.ExecToErr("begin pessimistic") - if err != nil { - errCh <- err - return - } - err = tk2.ExecToErr("insert into t values(5, 5)") - if err != nil { - errCh <- err - return - } - err = tk2.ExecToErr("delete from t where id = 5") - if err != nil { - errCh <- err - return - } - // let commit in tk start. - errCh <- err - time.Sleep(time.Millisecond * 100) - err = tk2.ExecToErr("commit") - errCh <- err - }() - err = <-errCh - require.Equal(t, nil, err) - tk.MustExec("commit") - tk.MustExec("admin check table t") - err = <-errCh - require.Equal(t, nil, err) -} - -func TestAmendWithColumnTypeChange(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2.MustExec("use test") - - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - - tk2.MustExec("drop table if exists t") - tk2.MustExec("create table t (id int primary key, v varchar(10));") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t values (1, \"123456789\")") - tk2.MustExec("alter table t modify column v varchar(5);") - require.Error(t, tk.ExecToErr("commit")) -} - -func TestIssue21498(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk2.MustExec("use test") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1") - - for _, partition := range []bool{false, true} { - // RC test - tk.MustExec("drop table if exists t, t1") - createTable := "create table t (id int primary key, v int, index iv (v))" - if partition { - createTable += " partition by range (id) (partition p0 values less than (0),partition p1 values less than (1),partition p2 values less than (2),partition p3 values less than (3),partition pn values less than MAXVALUE)" - } - tk.MustExec(createTable) - tk.MustExec("insert into t values (1, 10), (2, 20), (3, 30), (4, 40)") - tk.MustExec("create table t1(id int)") - tk.MustExec("insert into t1 values(1)") - - tk.MustExec("set tx_isolation = 'READ-COMMITTED'") - tk.MustExec("begin pessimistic") - tk.MustQuery("select * from t where v = 10").Check(testkit.Rows("1 10")) - - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 11 where id = 1") - - tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) - tk.MustQuery("select * from t where v = 11").Check(testkit.Rows("1 11")) - tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 11")) - tk.MustExec("admin check table t") - tk.MustExec("commit") - - tk.MustExec("drop table if exists t") - createTable = "create table t (id int primary key, v int, index iv (v), v2 int)" - if partition { - createTable += " partition by range (id) (partition p0 values less than (0),partition p1 values less than (1),partition p2 values less than (2),partition p3 values less than (3),partition pn values less than MAXVALUE)" - } - tk.MustExec(createTable) - tk.MustExec("insert into t values (1, 10, 100), (2, 20, 200), (3, 30, 300), (4, 40, 400)") - - tk.MustExec("begin pessimistic") - tk.MustQuery("select * from t use index (iv) where v = 10").Check(testkit.Rows("1 10 100")) - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 11 where id = 1") - err := tk.ExecToErr("select * from t use index (iv) where v = 10") - require.Equal(t, "[planner:1176]Key 'iv' doesn't exist in table 't'", err.Error()) - tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) - tk2.MustExec("update t set id = 5 where id = 1") - err = tk.ExecToErr("select * from t use index (iv) where v = 10") // select with - require.Equal(t, "[planner:1176]Key 'iv' doesn't exist in table 't'", err.Error()) - tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) - if !partition { - // amend transaction does not support partition table - tk.MustExec("insert into t(id, v, v2) select 6, v + 20, v2 + 200 from t where id = 4") // insert ... select with index unchanged - } - err = tk.ExecToErr("insert into t(id, v, v2) select 7, v + 30, v2 + 300 from t use index (iv) where id = 4") // insert ... select with index changed - require.Equal(t, "[planner:1176]Key 'iv' doesn't exist in table 't'", err.Error()) - tk.MustExec("admin check table t") // check consistency inside txn - tk.MustExec("commit") - if !partition { - tk.MustQuery("select * from t").Check(testkit.Rows("2 20 200", "3 30 300", "4 40 400", "5 11 100", "6 60 600")) - } - tk.MustExec("admin check table t") // check consistency out of txn - - // RR test for non partition - if partition { - continue - } - - tk.MustExec("set tx_isolation = 'REPEATABLE-READ'") - tk2.MustExec("alter table t add unique index iv(v)") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 21 where v = 20") - tk2.MustExec("update t set v = 31 where v = 30") - tk.MustExec("update t set v = 22 where v = 21") // fast path - tk.CheckExecResult(1, 0) - tk.MustExec("update t set v = 23 where v = 22") - tk.CheckExecResult(1, 0) - tk.MustExec("update t set v = 32 where v >= 31 and v < 40") // common path - tk.CheckExecResult(1, 0) - tk.MustExec("commit") - tk.MustQuery("select * from t").Check(testkit.Rows("2 23 200", "3 32 300", "4 40 400", "5 11 100", "6 60 600")) - - tk2.MustExec("alter table t add unique index iv(v)") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 24 where v = 23") - tk2.MustExec("update t set v = 41 where v = 40") - // fast path - tk.MustQuery("select * from t where v = 23").Check(testkit.Rows("2 23 200")) - tk.MustQuery("select * from t where v = 24").Check(testkit.Rows()) - tk.MustQuery("select * from t where v = 23 for update").Check(testkit.Rows()) - tk.MustQuery("select * from t where v = 24 for update").Check(testkit.Rows("2 24 200")) - tk.MustQuery("select (select id from t where v = 23), id from t1 for update").Check(testkit.Rows("2 1")) - tk.MustQuery("select (select id from t where v = 24), id from t1 for update").Check(testkit.Rows(" 1")) - tk.MustQuery("select (select id from t where v = 23 for update), id from t1").Check(testkit.Rows(" 1")) - tk.MustQuery("select (select id from t where v = 24 for update), id from t1").Check(testkit.Rows("2 1")) - tk.MustQuery("select (select id + 1 from t where v = 24 for update), id from t1").Check(testkit.Rows("3 1")) - // sub queries - tk.MustQuery("select (select id from (select id from t where v = 24 for update) tmp for update), (select id from t where v = 23), id from t where v = 23").Check(testkit.Rows("2 2 2")) - tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp), id from t where v = 23").Check(testkit.Rows("4 2")) - tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp for update), id from t where v = 23").Check(testkit.Rows("4 2")) - tk.MustQuery("select (select id + (select id from t where v = 23 for update) from (select id from t where v = 24 for update) tmp), id from t where v = 23").Check(testkit.Rows(" 2")) - tk.MustQuery("select (select id + (select id from t where v = 23 for update) from (select id from t where v = 24 for update) tmp for update), id from t where v = 23").Check(testkit.Rows(" 2")) - tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 23) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) - tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) - tk.MustQuery("select (select id + (select id from t where v = 24 for update) from (select id from t where v = 23) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) - - // test index look up - tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id").Check(testkit.Rows("2 23 200 2 23 200")) - tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id").Check(testkit.Rows()) - tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id for update").Check(testkit.Rows()) - // TODO: Do the same with Partitioned Table!!! Since this query leads to two columns in SelectLocExec.tblID2Handle!!! - tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id for update").Check(testkit.Rows("2 24 200 2 24 200")) - tk.MustExec("delete from t where v = 24") - tk.CheckExecResult(1, 0) - // common path - tk.MustQuery("select * from t where v >= 41 and v < 50").Check(testkit.Rows()) - tk.MustQuery("select * from t where v >= 41 and v < 50 for update").Check(testkit.Rows("4 41 400")) - tk.MustExec("delete from t where v >= 41 and v < 50") - tk.CheckExecResult(1, 0) - tk.MustExec("commit") - tk.MustQuery("select * from t").Check(testkit.Rows("3 32 300", "5 11 100", "6 60 600")) - - tk2.MustExec("alter table t add unique index iv(v)") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 33 where v = 32") - tk.MustExec("insert into t(id, v, v2) select 3 * id, 3 * v, 3 * v2 from t where v = 33") - tk.CheckExecResult(1, 0) - tk.MustExec("insert into t(id, v, v2) select (select 4 * id from t where v = 32) id, 4 * v, 4 * v2 from t where v = 33") - tk.CheckExecResult(1, 0) - err = tk.ExecToErr("insert into t(id, v, v2) select (select 4 * id from t where v = 33) id, 4 * v, 4 * v2 from t where v = 33") - require.Error(t, err) - require.Equal(t, "[table:1048]Column 'id' cannot be null", err.Error()) - tk.MustExec("commit") - tk.MustQuery("select * from t").Check(testkit.Rows("3 33 300", "5 11 100", "6 60 600", "9 99 900", "12 132 1200")) - - tk2.MustExec("alter table t add unique index iv(v)") - tk2.MustExec("drop table if exists t1") - tk2.MustExec("create table t1(id int primary key, v int, index iv (v), v2 int)") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 34 where v = 33") - tk2.MustExec("update t set v = 12 where v = 11") - tk.MustExec("insert into t1(id, v, v2) select * from t where v = 33") - tk.CheckExecResult(0, 0) - tk.MustExec("insert into t1(id, v, v2) select * from t where v = 12") - tk.CheckExecResult(1, 0) - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("5 12 100")) - } -} - func TestPlanCacheSchemaChange(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tmp := testkit.NewTestKit(t, store) @@ -2743,8 +2281,6 @@ func TestPlanCacheSchemaChange(t *testing.T) { tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (4, 4, 4)") tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1") - tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1") // generate plan cache tk.MustExec("prepare update_stmt from 'update t set vv = vv + 1 where v = ?'") @@ -2956,177 +2492,6 @@ func createTable(part bool, columnNames []string, columnTypes []string) string { return str } -func TestAmendForIndexChange(t *testing.T) { - defer config.RestoreFunc()() - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.AsyncCommit.SafeWindow = 0 - conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 - }) - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2.MustExec("use test") - - tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = ON;") - tk.Session().GetSessionVars().EnableAsyncCommit = false - tk.Session().GetSessionVars().Enable1PC = false - - tk2.MustExec("drop table if exists t1") - - // Add some different column types. - columnNames := []string{"c_int", "c_str", "c_datetime", "c_timestamp", "c_double", "c_decimal", "c_float"} - columnTypes := []string{"int", "varchar(40)", "datetime", "timestamp", "double", "decimal(12, 6)", "float"} - - addIndexFunc := func(idxName string, part bool, a, b int) string { - var str string - str = "alter table t" - if part { - str = "alter table t_part" - } - str += " add index " + idxName + " (" - str += strings.Join(columnNames[a:b], ",") - str += ")" - return str - } - - for i := 0; i < len(columnTypes); i++ { - for j := i + 1; j <= len(columnTypes); j++ { - // Create table and prepare some data. - tk2.MustExec("drop table if exists t") - tk2.MustExec("drop table if exists t_part") - tk2.MustExec(createTable(false, columnNames, columnTypes)) - tk2.MustExec(createTable(true, columnNames, columnTypes)) - tk2.MustExec(`insert into t values(1, "1", "2000-01-01", "2020-01-01", "1.1", "123.321", 1.1)`) - tk2.MustExec(`insert into t values(2, "2", "2000-01-02", "2020-01-02", "2.2", "223.322", 2.2)`) - tk2.MustExec(`insert into t_part values(1, "1", "2000-01-01", "2020-01-01", "1.1", "123.321", 1.1)`) - tk2.MustExec(`insert into t_part values(2, "2", "2000-01-02", "2020-01-02", "2.2", "223.322", 2.2)`) - - // Start a pessimistic transaction, the amend should succeed for common table. - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t values(5, "555", "2000-01-05", "2020-01-05", "5.5", "555.555", 5.5)`) - idxName := fmt.Sprintf("index%d%d", i, j) - tk2.MustExec(addIndexFunc(idxName, false, i, j)) - tk.MustExec("commit") - tk2.MustExec("admin check table t") - - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t values(6, "666", "2000-01-06", "2020-01-06", "6.6", "666.666", 6.6)`) - tk2.MustExec(fmt.Sprintf(`alter table t drop index %s`, idxName)) - tk.MustExec("commit") - tk2.MustExec("admin check table t") - tk2.MustQuery("select count(*) from t").Check(testkit.Rows("4")) - - // Start a pessimistic transaction for partition table, the amend should fail. - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t_part values(5, "555", "2000-01-05", "2020-01-05", "5.5", "555.555", 5.5)`) - tk2.MustExec(addIndexFunc(idxName, true, i, j)) - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t_part") - - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t_part values(6, "666", "2000-01-06", "2020-01-06", "6.6", "666.666", 6.6)`) - tk2.MustExec(fmt.Sprintf(`alter table t_part drop index %s`, idxName)) - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t_part") - tk2.MustQuery("select count(*) from t_part").Check(testkit.Rows("2")) - } - } -} - -func TestAmendForColumnChange(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2.MustExec("use test") - - tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = ON;") - tk2.MustExec("drop table if exists t1") - - // Add some different column types. - columnNames := []string{"c_int", "c_str", "c_datetime", "c_timestamp", "c_double", "c_decimal", "c_float"} - columnTypes := []string{"int", "varchar(40)", "datetime", "timestamp", "double", "decimal(12, 6)", "float"} - colChangeDDLs := []string{ - "alter table %s change column c_int c_int bigint", - "alter table %s modify column c_str varchar(55)", - "alter table %s modify column c_datetime datetime", - "alter table %s modify column c_timestamp timestamp", - "alter table %s modify column c_double double default NULL", - "alter table %s modify column c_int bigint(20) default 100", - "alter table %s change column c_float c_float float", - "alter table %s modify column c_int bigint(20)", - } - amendSucc := []bool{ - true, - true, - true, - true, - true, - false, - true, - true, - } - colChangeFunc := func(part bool, i int) string { - var sql string - sql = colChangeDDLs[i] - if part { - sql = fmt.Sprintf(sql, "t_part") - } else { - sql = fmt.Sprintf(sql, "t") - } - return sql - } - - for i := 0; i < len(colChangeDDLs); i++ { - // Create table and prepare some data. - tk2.MustExec("drop table if exists t") - tk2.MustExec("drop table if exists t_part") - tk2.MustExec(createTable(false, columnNames, columnTypes)) - tk2.MustExec(createTable(true, columnNames, columnTypes)) - tk2.MustExec(`insert into t values(1, "1", "2000-01-01", "2020-01-01", "1.1", "123.321", 1.1)`) - tk2.MustExec(`insert into t values(2, "2", "2000-01-02", "2020-01-02", "2.2", "223.322", 2.2)`) - tk2.MustExec(`insert into t_part values(1, "1", "2000-01-01", "2020-01-01", "1.1", "123.321", 1.1)`) - tk2.MustExec(`insert into t_part values(2, "2", "2000-01-02", "2020-01-02", "2.2", "223.322", 2.2)`) - - // Start a pessimistic transaction, the amend should succeed for common table. - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t values(5, "555", "2000-01-05", "2020-01-05", "5.5", "555.555", 5.5)`) - tk2.MustExec(colChangeFunc(false, i)) - if amendSucc[i] { - tk.MustExec("commit") - } else { - require.Error(t, tk.ExecToErr("commit")) - } - tk2.MustExec("admin check table t") - if amendSucc[i] { - tk2.MustQuery("select count(*) from t").Check(testkit.Rows("3")) - } else { - tk2.MustQuery("select count(*) from t").Check(testkit.Rows("2")) - } - - // Start a pessimistic transaction for partition table, the amend should fail. - if i == 5 { - // alter table t_part modify column c_int bigint(20) default 100 - // Unsupported modify column: can't change the partitioning column, since it would require reorganize all partitions - // Skip this case - continue - } - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t_part values(5, "555", "2000-01-05", "2020-01-05", "5.5", "555.555", 5.5)`) - tk2.MustExec(colChangeFunc(true, i)) - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t_part") - tk2.MustQuery("select count(*) from t_part").Check(testkit.Rows("2")) - } -} - func TestPessimisticAutoCommitTxn(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) From f600fc694f98b30f90c5723ae29c6dbb9da6dbbf Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Fri, 6 Jan 2023 18:56:21 +0800 Subject: [PATCH 2/6] executor: reset the related session vars for both INSERT and REPLACE (#40354) close pingcap/tidb#40351 --- executor/executor.go | 2 ++ executor/insert.go | 2 -- executor/writetest/write_test.go | 20 ++++++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 1679ed9e57e12..ae7a7383f5983 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -2186,6 +2186,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { vars.PrevFoundInBinding = vars.FoundInBinding vars.FoundInBinding = false vars.DurationWaitTS = 0 + vars.CurrInsertBatchExtraCols = nil + vars.CurrInsertValues = chunk.Row{} return } diff --git a/executor/insert.go b/executor/insert.go index d0b09e302860c..1a4eb27d3c6f6 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -345,8 +345,6 @@ func (e *InsertExec) Close() error { defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } defer e.memTracker.ReplaceBytesUsed(0) - e.ctx.GetSessionVars().CurrInsertValues = chunk.Row{} - e.ctx.GetSessionVars().CurrInsertBatchExtraCols = e.ctx.GetSessionVars().CurrInsertBatchExtraCols[0:0:0] e.setMessage() if e.SelectExec != nil { return e.SelectExec.Close() diff --git a/executor/writetest/write_test.go b/executor/writetest/write_test.go index 32939a1b16033..f6846a944a486 100644 --- a/executor/writetest/write_test.go +++ b/executor/writetest/write_test.go @@ -4318,3 +4318,23 @@ func TestIssueInsertPrefixIndexForNonUTF8Collation(t *testing.T) { tk.MustExec("insert into t3 select 'abc '") tk.MustGetErrCode("insert into t3 select 'abc d'", 1062) } + +func TestMutipleReplaceAndInsertInOneSession(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t_securities(id bigint not null auto_increment primary key, security_id varchar(8), market_id smallint, security_type int, unique key uu(security_id, market_id))") + tk.MustExec(`insert into t_securities (security_id, market_id, security_type) values ("1", 2, 7), ("7", 1, 7) ON DUPLICATE KEY UPDATE security_type = VALUES(security_type)`) + tk.MustExec(`replace into t_securities (security_id, market_id, security_type) select security_id+1, 1, security_type from t_securities where security_id="7";`) + tk.MustExec(`INSERT INTO t_securities (security_id, market_id, security_type) values ("1", 2, 7), ("7", 1, 7) ON DUPLICATE KEY UPDATE security_type = VALUES(security_type)`) + + tk.MustQuery("select * from t_securities").Sort().Check(testkit.Rows("1 1 2 7", "2 7 1 7", "3 8 1 7")) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec(`insert into t_securities (security_id, market_id, security_type) values ("1", 2, 7), ("7", 1, 7) ON DUPLICATE KEY UPDATE security_type = VALUES(security_type)`) + tk2.MustExec(`insert into t_securities (security_id, market_id, security_type) select security_id+2, 1, security_type from t_securities where security_id="7";`) + tk2.MustExec(`INSERT INTO t_securities (security_id, market_id, security_type) values ("1", 2, 7), ("7", 1, 7) ON DUPLICATE KEY UPDATE security_type = VALUES(security_type)`) + + tk2.MustQuery("select * from t_securities").Sort().Check(testkit.Rows("1 1 2 7", "2 7 1 7", "3 8 1 7", "8 9 1 7")) +} From f9f72688fb1e378b752108ffcd2e691ac1bf6ebd Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 6 Jan 2023 19:28:21 +0800 Subject: [PATCH 3/6] *: fix static pruning partition table in disaggregated tiflash mode (#40238) close pingcap/tidb#40239 --- domain/domain.go | 6 ++-- executor/mpp_gather.go | 4 ++- executor/tiflashtest/tiflash_test.go | 24 +++++++++++++++ kv/mpp.go | 3 +- planner/core/find_best_task.go | 3 +- planner/core/fragment.go | 45 +++++++++++++++++++--------- 6 files changed, 65 insertions(+), 20 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 34eb4a80742e6..5be4c1f6f028a 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1427,7 +1427,7 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { // WatchTiFlashComputeNodeChange create a routine to watch if the topology of tiflash_compute node is changed. // TODO: tiflashComputeNodeKey is not put to etcd yet(finish this when AutoScaler is done) // -// store cache will only be invalidated every 30 seconds. +// store cache will only be invalidated every n seconds. func (do *Domain) WatchTiFlashComputeNodeChange() error { var watchCh clientv3.WatchChan if do.etcdClient != nil { @@ -1468,8 +1468,8 @@ func (do *Domain) WatchTiFlashComputeNodeChange() error { case tikv.Storage: logCount++ s.GetRegionCache().InvalidateTiFlashComputeStores() - if logCount == 60 { - // Print log every 60*duration seconds. + if logCount == 6 { + // Print log every 6*duration seconds. logutil.BgLogger().Debug("tiflash_compute store cache invalied, will update next query", zap.Bool("watched", watched)) logCount = 0 } diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index eee019bd0de47..eba5498f8869d 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -81,7 +81,9 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { for _, mppTask := range pf.ExchangeSender.Tasks { if mppTask.PartitionTableIDs != nil { err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs) - } else { + } else if !mppTask.IsDisaggregatedTiFlashStaticPrune { + // If isDisaggregatedTiFlashStaticPrune is true, it means this TableScan is under PartitionUnoin, + // tableID in TableScan is already the physical table id of this partition, no need to update again. err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, []int64{mppTask.TableID}) } if err != nil { diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index ca52fb48fd788..e8cd94d889188 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1308,4 +1308,28 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) { failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode)) defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery") tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;") + + tk.MustExec("set @@tidb_partition_prune_mode = 'static';") + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + tk.MustExec("create table t1(c1 int, c2 int) partition by hash(c1) partitions 3") + tk.MustExec("insert into t1 values(1, 1), (2, 2), (3, 3)") + tk.MustExec("alter table t1 set tiflash replica 1") + tb = external.GetTableByName(t, tk, "test", "t1") + err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + tk.MustQuery("explain select * from t1 where c1 < 2").Check(testkit.Rows( + "PartitionUnion_10 9970.00 root ", + "├─TableReader_15 3323.33 root data:ExchangeSender_14", + "│ └─ExchangeSender_14 3323.33 mpp[tiflash] ExchangeType: PassThrough", + "│ └─Selection_13 3323.33 mpp[tiflash] lt(test.t1.c1, 2)", + "│ └─TableFullScan_12 10000.00 mpp[tiflash] table:t1, partition:p0 keep order:false, stats:pseudo", + "├─TableReader_19 3323.33 root data:ExchangeSender_18", + "│ └─ExchangeSender_18 3323.33 mpp[tiflash] ExchangeType: PassThrough", + "│ └─Selection_17 3323.33 mpp[tiflash] lt(test.t1.c1, 2)", + "│ └─TableFullScan_16 10000.00 mpp[tiflash] table:t1, partition:p1 keep order:false, stats:pseudo", + "└─TableReader_23 3323.33 root data:ExchangeSender_22", + " └─ExchangeSender_22 3323.33 mpp[tiflash] ExchangeType: PassThrough", + " └─Selection_21 3323.33 mpp[tiflash] lt(test.t1.c1, 2)", + " └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 keep order:false, stats:pseudo")) + // tk.MustQuery("select * from t1 where c1 < 2").Check(testkit.Rows("1 1")) } diff --git a/kv/mpp.go b/kv/mpp.go index 32e9186506067..de0a8e8654528 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -42,7 +42,8 @@ type MPPTask struct { MppQueryID MPPQueryID TableID int64 // physical table id - PartitionTableIDs []int64 + PartitionTableIDs []int64 + IsDisaggregatedTiFlashStaticPrune bool } // ToPB generates the pb structure. diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index aff1c29997fbd..37e58a6e09327 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2001,8 +2001,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid if ts.KeepOrder { return invalidTask, nil } - if prop.MPPPartitionTp != property.AnyType || ts.isPartition { + if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) { // If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution. + // But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table. ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 7e86696ccc4d6..2496d463f2e24 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -21,6 +21,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -375,17 +376,30 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic var allPartitionsIDs []int64 var err error splitedRanges, _ := distsql.SplitRangesAcrossInt64Boundary(ts.Ranges, false, false, ts.Table.IsCommonHandle) + // True when: + // 0. Is disaggregated tiflash. because in non-disaggregated tiflash, we dont use mpp for static pruning. + // 1. Is partition table. + // 2. Dynamic prune is not used. + var isDisaggregatedTiFlashStaticPrune bool if ts.Table.GetPartitionInfo() != nil { + isDisaggregatedTiFlashStaticPrune = config.GetGlobalConfig().DisaggregatedTiFlash && + !e.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() + tmp, _ := e.is.TableByID(ts.Table.ID) tbl := tmp.(table.PartitionedTable) - var partitions []table.PhysicalTable - partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames) - if err != nil { - return nil, errors.Trace(err) + if !isDisaggregatedTiFlashStaticPrune { + var partitions []table.PhysicalTable + partitions, err = partitionPruning(e.ctx, tbl, ts.PartitionInfo.PruningConds, ts.PartitionInfo.PartitionNames, ts.PartitionInfo.Columns, ts.PartitionInfo.ColumnNames) + if err != nil { + return nil, errors.Trace(err) + } + req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions) + } else { + singlePartTbl := tbl.GetPartition(ts.physicalTableID) + req, err = e.constructMPPBuildTaskForNonPartitionTable(singlePartTbl.GetPhysicalID(), ts.Table.IsCommonHandle, splitedRanges) } - req, allPartitionsIDs, err = e.constructMPPBuildTaskReqForPartitionedTable(ts, splitedRanges, partitions) } else { - req, err = e.constructMPPBuildTaskForNonPartitionTable(ts, splitedRanges) + req, err = e.constructMPPBuildTaskForNonPartitionTable(ts.Table.ID, ts.Table.IsCommonHandle, splitedRanges) } if err != nil { return nil, errors.Trace(err) @@ -403,12 +417,15 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic tasks := make([]*kv.MPPTask, 0, len(metas)) for _, meta := range metas { - task := &kv.MPPTask{Meta: meta, - ID: AllocMPPTaskID(e.ctx), - StartTs: e.startTS, - MppQueryID: e.mppQueryID, - TableID: ts.Table.ID, - PartitionTableIDs: allPartitionsIDs} + task := &kv.MPPTask{ + Meta: meta, + ID: AllocMPPTaskID(e.ctx), + StartTs: e.startTS, + MppQueryID: e.mppQueryID, + TableID: ts.Table.ID, + PartitionTableIDs: allPartitionsIDs, + IsDisaggregatedTiFlashStaticPrune: isDisaggregatedTiFlashStaticPrune, + } tasks = append(tasks, task) } return tasks, nil @@ -435,8 +452,8 @@ func (e *mppTaskGenerator) constructMPPBuildTaskReqForPartitionedTable(ts *Physi return &kv.MPPBuildTasksRequest{PartitionIDAndRanges: partitionIDAndRanges}, allPartitionsIDs, nil } -func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(ts *PhysicalTableScan, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) { - kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{ts.Table.ID}, ts.Table.IsCommonHandle, splitedRanges, nil) +func (e *mppTaskGenerator) constructMPPBuildTaskForNonPartitionTable(tid int64, isCommonHandle bool, splitedRanges []*ranger.Range) (*kv.MPPBuildTasksRequest, error) { + kvRanges, err := distsql.TableHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{tid}, isCommonHandle, splitedRanges, nil) if err != nil { return nil, errors.Trace(err) } From b906bf9934e32cd99bfd0d7b1792ae93c30efbdf Mon Sep 17 00:00:00 2001 From: Jk Xu <54522439+Dousir9@users.noreply.github.com> Date: Fri, 6 Jan 2023 23:34:21 +0800 Subject: [PATCH 4/6] planner: fix typo (#40367) --- planner/core/exhaust_physical_plans.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index dfd8d9572db81..205a2d8242b4b 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2530,7 +2530,7 @@ func (p *baseLogicalPlan) exhaustPhysicalPlans(_ *property.PhysicalProperty) ([] } // canPushToCop checks if it can be pushed to some stores. For TiKV, it only checks datasource. -// For TiFlash, it will check whether the operator is supported, but note that the check might be inaccrute. +// For TiFlash, it will check whether the operator is supported, but note that the check might be inaccrate. func (p *baseLogicalPlan) canPushToCop(storeTp kv.StoreType) bool { return p.canPushToCopImpl(storeTp, false) } From a9d8bfe6ae00596c843b3a8f95b8c6b435172179 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Sat, 7 Jan 2023 00:00:21 +0800 Subject: [PATCH 5/6] session: fix data race in the LazyTxn.LockKeys (#40350) close pingcap/tidb#40355 --- DEPS.bzl | 8 ++++---- go.mod | 4 ++-- go.sum | 8 ++++---- kv/interface_mock_test.go | 7 +++++++ kv/kv.go | 4 ++++ session/txn.go | 29 ++++++++++++++++++----------- sessionctx/BUILD.bazel | 1 + sessiontxn/BUILD.bazel | 2 ++ store/driver/txn/txn_driver.go | 6 ++++++ 9 files changed, 48 insertions(+), 21 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index fe7d110931a2c..bd54bdd5bf2ab 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3527,16 +3527,16 @@ def go_deps(): name = "com_github_tiancaiamao_gp", build_file_proto_mode = "disable", importpath = "github.com/tiancaiamao/gp", - sum = "h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=", - version = "v0.0.0-20221221095600-1a473d1f9b4b", + sum = "h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=", + version = "v0.0.0-20221230034425-4025bc8a4d4a", ) go_repository( name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=", - version = "v2.0.4-0.20221226080148-018c59dbd837", + sum = "h1:cPtMXTExqjzk8L40qhrgB/mXiBXKP5LRU0vwjtI2Xxo=", + version = "v2.0.4", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/go.mod b/go.mod index 46b0536b128bd..c1b97c5d08b56 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837 + github.com/tikv/client-go/v2 v2.0.4 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 @@ -221,7 +221,7 @@ require ( github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b // indirect + github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect diff --git a/go.sum b/go.sum index 4a6a635b40ebc..e2c220f5e11f1 100644 --- a/go.sum +++ b/go.sum @@ -933,10 +933,10 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo= -github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837 h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ= -github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837/go.mod h1:ptS8K+VBrEH2gIS3JxaiFSSLfDDyuS2xcdLozOtBWBw= +github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= +github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +github.com/tikv/client-go/v2 v2.0.4 h1:cPtMXTExqjzk8L40qhrgB/mXiBXKP5LRU0vwjtI2Xxo= +github.com/tikv/client-go/v2 v2.0.4/go.mod h1:v52O5zDtv2BBus4lm5yrSQhxGW4Z4RaXWfg0U1Kuyqo= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 164e777c6ef4a..561c0aa12baaf 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -52,6 +52,13 @@ func (t *mockTxn) LockKeys(_ context.Context, _ *LockCtx, _ ...Key) error { return nil } +func (t *mockTxn) LockKeysFunc(_ context.Context, _ *LockCtx, fn func(), _ ...Key) error { + if fn != nil { + fn() + } + return nil +} + func (t *mockTxn) SetOption(opt int, val interface{}) { t.opts[opt] = val } diff --git a/kv/kv.go b/kv/kv.go index 38243aa13db08..4c855c0938308 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -222,6 +222,10 @@ type Transaction interface { // LockKeys tries to lock the entries with the keys in KV store. // Will block until all keys are locked successfully or an error occurs. LockKeys(ctx context.Context, lockCtx *LockCtx, keys ...Key) error + // LockKeysFunc tries to lock the entries with the keys in KV store. + // Will block until all keys are locked successfully or an error occurs. + // fn is called before LockKeys unlocks the keys. + LockKeysFunc(ctx context.Context, lockCtx *LockCtx, fn func(), keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. SetOption(opt int, val interface{}) diff --git a/session/txn.go b/session/txn.go index 85f77f8078679..ed21d1c3560f5 100644 --- a/session/txn.go +++ b/session/txn.go @@ -20,7 +20,6 @@ import ( "fmt" "runtime/trace" "strings" - "sync" "sync/atomic" "time" @@ -36,6 +35,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sli" + "github.com/pingcap/tidb/util/syncutil" "github.com/pingcap/tipb/go-binlog" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -64,7 +64,7 @@ type LazyTxn struct { // The data in this session would be query by other sessions, so Mutex is necessary. // Since read is rare, the reader can copy-on-read to get a data snapshot. mu struct { - sync.RWMutex + syncutil.RWMutex txninfo.TxnInfo } @@ -428,6 +428,11 @@ func (txn *LazyTxn) RollbackMemDBToCheckpoint(savepoint *tikv.MemDBCheckpoint) { // LockKeys Wrap the inner transaction's `LockKeys` to record the status func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { + return txn.LockKeysFunc(ctx, lockCtx, nil, keys...) +} + +// LockKeysFunc Wrap the inner transaction's `LockKeys` to record the status +func (txn *LazyTxn) LockKeysFunc(ctx context.Context, lockCtx *kv.LockCtx, fn func(), keys ...kv.Key) error { failpoint.Inject("beforeLockKeys", func() {}) t := time.Now() @@ -438,15 +443,17 @@ func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...k txn.mu.TxnInfo.BlockStartTime.Valid = true txn.mu.TxnInfo.BlockStartTime.Time = t txn.mu.Unlock() - - err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) - - txn.mu.Lock() - defer txn.mu.Unlock() - txn.updateState(originState) - txn.mu.TxnInfo.BlockStartTime.Valid = false - txn.mu.TxnInfo.EntriesCount = uint64(txn.Transaction.Len()) - return err + lockFunc := func() { + if fn != nil { + fn() + } + txn.mu.Lock() + defer txn.mu.Unlock() + txn.updateState(originState) + txn.mu.TxnInfo.BlockStartTime.Valid = false + txn.mu.TxnInfo.EntriesCount = uint64(txn.Transaction.Len()) + } + return txn.Transaction.LockKeysFunc(ctx, lockCtx, lockFunc, keys...) } func (txn *LazyTxn) reset() { diff --git a/sessionctx/BUILD.bazel b/sessionctx/BUILD.bazel index 6cd2317cf8f01..800001fd426b3 100644 --- a/sessionctx/BUILD.bazel +++ b/sessionctx/BUILD.bazel @@ -33,6 +33,7 @@ go_test( ], embed = [":sessionctx"], flaky = True, + race = "on", deps = [ "//testkit/testsetup", "@com_github_stretchr_testify//require", diff --git a/sessiontxn/BUILD.bazel b/sessiontxn/BUILD.bazel index e484defb5b0c1..a92e5a81dd92e 100644 --- a/sessiontxn/BUILD.bazel +++ b/sessiontxn/BUILD.bazel @@ -27,6 +27,8 @@ go_test( "txn_rc_tso_optimize_test.go", ], flaky = True, + race = "on", + shard_count = 2, deps = [ ":sessiontxn", "//domain", diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index f48e7ad328d86..7dd386b539c73 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -76,6 +76,12 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput return txn.extractKeyErr(err) } +func (txn *tikvTxn) LockKeysFunc(ctx context.Context, lockCtx *kv.LockCtx, fn func(), keysInput ...kv.Key) error { + keys := toTiKVKeys(keysInput) + err := txn.KVTxn.LockKeysFunc(ctx, lockCtx, fn, keys...) + return txn.extractKeyErr(err) +} + func (txn *tikvTxn) Commit(ctx context.Context) error { err := txn.KVTxn.Commit(ctx) return txn.extractKeyErr(err) From f7c87c883307d1dc6f9051a3a246272657492e9d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 7 Jan 2023 17:30:21 +0800 Subject: [PATCH 6/6] build(deps): bump golang.org/x/crypto from 0.1.0 to 0.5.0 (#40379) --- DEPS.bzl | 20 ++++++++++---------- go.mod | 10 +++++----- go.sum | 20 ++++++++++---------- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index bd54bdd5bf2ab..5f54c4579a545 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4383,8 +4383,8 @@ def go_deps(): name = "org_golang_x_crypto", build_file_proto_mode = "disable_global", importpath = "golang.org/x/crypto", - sum = "h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=", - version = "v0.1.0", + sum = "h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=", + version = "v0.5.0", ) go_repository( name = "org_golang_x_exp", @@ -4433,8 +4433,8 @@ def go_deps(): name = "org_golang_x_net", build_file_proto_mode = "disable_global", importpath = "golang.org/x/net", - sum = "h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=", - version = "v0.4.0", + sum = "h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=", + version = "v0.5.0", ) go_repository( name = "org_golang_x_oauth2", @@ -4454,22 +4454,22 @@ def go_deps(): name = "org_golang_x_sys", build_file_proto_mode = "disable_global", importpath = "golang.org/x/sys", - sum = "h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=", - version = "v0.3.0", + sum = "h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=", + version = "v0.4.0", ) go_repository( name = "org_golang_x_term", build_file_proto_mode = "disable_global", importpath = "golang.org/x/term", - sum = "h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=", - version = "v0.3.0", + sum = "h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg=", + version = "v0.4.0", ) go_repository( name = "org_golang_x_text", build_file_proto_mode = "disable_global", importpath = "golang.org/x/text", - sum = "h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=", - version = "v0.5.0", + sum = "h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=", + version = "v0.6.0", ) go_repository( name = "org_golang_x_time", diff --git a/go.mod b/go.mod index c1b97c5d08b56..7078c38cae11b 100644 --- a/go.mod +++ b/go.mod @@ -111,12 +111,12 @@ require ( go.uber.org/multierr v1.9.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e - golang.org/x/net v0.4.0 + golang.org/x/net v0.5.0 golang.org/x/oauth2 v0.3.0 golang.org/x/sync v0.1.0 - golang.org/x/sys v0.3.0 - golang.org/x/term v0.3.0 - golang.org/x/text v0.5.0 + golang.org/x/sys v0.4.0 + golang.org/x/term v0.4.0 + golang.org/x/text v0.6.0 golang.org/x/time v0.3.0 golang.org/x/tools v0.2.0 google.golang.org/api v0.74.0 @@ -242,7 +242,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect go.opentelemetry.io/otel/trace v0.20.0 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect - golang.org/x/crypto v0.1.0 // indirect + golang.org/x/crypto v0.5.0 // indirect golang.org/x/exp/typeparams v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/mod v0.7.0 // indirect golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect diff --git a/go.sum b/go.sum index e2c220f5e11f1..fe66c249e5c60 100644 --- a/go.sum +++ b/go.sum @@ -1087,8 +1087,8 @@ golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= -golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= +golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= +golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1194,8 +1194,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= -golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1320,12 +1320,12 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220909162455-aba9fc2a8ff2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.3.0 h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI= -golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= +golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg= +golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1335,8 +1335,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=