Skip to content

Commit

Permalink
lightning: do resolve conflict job when other lightning has local dup…
Browse files Browse the repository at this point in the history
…es (#41157)

close #40923
  • Loading branch information
lichunzhu authored Feb 9, 2023
1 parent d16f4c0 commit 7255868
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 12 deletions.
19 changes: 8 additions & 11 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type tableMetaMgr interface {
UpdateTableStatus(ctx context.Context, status metaStatus) error
UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error
CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (
needChecksum bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error)
otherHasDupe bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error)
FinishTable(ctx context.Context) error
}

Expand Down Expand Up @@ -370,7 +370,7 @@ func (m *dbTableMetaMgr) UpdateTableStatus(ctx context.Context, status metaStatu
}

func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (
needChecksum bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error,
otherHasDupe bool, needRemoteDupe bool, baseTotalChecksum *verify.KVChecksum, err error,
) {
conn, err := m.session.Conn(ctx)
if err != nil {
Expand All @@ -393,7 +393,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
taskHasDuplicates bool
)
newStatus := metaStatusChecksuming
needChecksum = true
otherHasDupe = false
needRemoteDupe = true
err = exec.Transact(ctx, "checksum pre-check", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
Expand Down Expand Up @@ -423,9 +423,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
return err
}

if taskHasDuplicates {
needChecksum = false
}
otherHasDupe = otherHasDupe || taskHasDuplicates

// skip finished meta
if status >= metaStatusFinished {
Expand All @@ -436,7 +434,6 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
if status >= metaStatusChecksuming {
newStatus = status
needRemoteDupe = status == metaStatusChecksuming
needChecksum = needChecksum && needRemoteDupe
return nil
}

Expand All @@ -445,7 +442,6 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks

if status < metaStatusChecksuming {
newStatus = metaStatusChecksumSkipped
needChecksum = false
needRemoteDupe = false
break
} else if status == metaStatusChecksuming {
Expand Down Expand Up @@ -475,12 +471,13 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks
return false, false, nil, err
}

if needChecksum {
if !otherHasDupe && needRemoteDupe {
ck := verify.MakeKVChecksum(totalBytes, totalKvs, totalChecksum)
baseTotalChecksum = &ck
}
log.FromContext(ctx).Info("check table checksum", zap.String("table", m.tr.tableName),
zap.Bool("checksum", needChecksum), zap.String("new_status", newStatus.String()))
zap.Bool("otherHasDupe", otherHasDupe), zap.Bool("needRemoteDupe", needRemoteDupe),
zap.String("new_status", newStatus.String()))
return
}

Expand Down Expand Up @@ -1073,7 +1070,7 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum
}

func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) {
return true, true, &verify.KVChecksum{}, nil
return false, true, &verify.KVChecksum{}, nil
}

func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,11 +794,19 @@ func (tr *TableRestore) postProcess(
}
hasDupe = hasLocalDupe
}
failpoint.Inject("SlowDownCheckDupe", func(v failpoint.Value) {
sec := v.(int)
tr.logger.Warn("start to sleep several seconds before checking other dupe",
zap.Int("seconds", sec))
time.Sleep(time.Duration(sec) * time.Second)
})

needChecksum, needRemoteDupe, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum, hasDupe)
otherHasDupe, needRemoteDupe, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum, hasDupe)
if err != nil {
return false, err
}
needChecksum := !otherHasDupe && needRemoteDupe
hasDupe = hasDupe || otherHasDupe

if needRemoteDupe && rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
opts := &kv.SessionOptions{
Expand Down
41 changes: 41 additions & 0 deletions br/tests/lightning_duplicate_resolution_incremental/config1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[lightning]
task-info-schema-name = 'lightning_task_info_dupe_resolve_incremental'
index-concurrency = 10
table-concurrency = 10

[tikv-importer]
backend = "local"
on-duplicate = "replace"
duplicate-resolution = "remove"
incremental-import = true

[checkpoint]
enable = true
schema = "tidb_lightning_checkpoint_dupe_resolve_incremental1"
driver = "mysql"

[[mydumper.files]]
pattern = '(?i).*(-schema-trigger|-schema-post)\.sql$'
type = 'ignore'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$'
schema = '$1'
type = 'schema-schema'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$'
schema = '$1'
table = '$2'
type = 'table-schema'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)\.0\.sql$'
schema = '$1'
table = '$2'
key = '0'
type = 'sql'

[post-restore]
analyze = false
checksum = "optional"
41 changes: 41 additions & 0 deletions br/tests/lightning_duplicate_resolution_incremental/config2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[lightning]
task-info-schema-name = 'lightning_task_info_dupe_resolve_incremental'
index-concurrency = 10
table-concurrency = 10

[tikv-importer]
backend = "local"
on-duplicate = "replace"
duplicate-resolution = "remove"
incremental-import = true

[checkpoint]
enable = true
schema = "tidb_lightning_checkpoint_dupe_resolve_incremental2"
driver = "mysql"

[[mydumper.files]]
pattern = '(?i).*(-schema-trigger|-schema-post)\.sql$'
type = 'ignore'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$'
schema = '$1'
type = 'schema-schema'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$'
schema = '$1'
table = '$2'
type = 'table-schema'

[[mydumper.files]]
pattern = '(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)\.1\.sql$'
schema = '$1'
table = '$2'
key = '1'
type = 'sql'

[post-restore]
analyze = false
checksum = "optional"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create schema dup_resolve_detect;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
create table ta (
id varchar(11) not null primary key nonclustered, -- use varchar here to make sure _tidb_rowid will be generated
name varchar(20) not null,
size bigint not null,
unique key uni_name(name)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
insert into ta values (3, '3c49f3bd', 6194643990092531757);
insert into ta values (13, '1da87b44', 3724743701402246028);
insert into ta values (6, '8b080186', 4840750639653607661);
insert into ta values (1, 'c83c0e6a', 5057094372111243649);
insert into ta values (12, 'dd73baf5', 2295098755414696158);
insert into ta values (4, '1cf99fa1', 2520784525406914042);
insert into ta values (11, 'b238a0e6', 3314555604794199537);
insert into ta values (10, 'a489c47a', 7706822128523578708);
insert into ta values (10, '9a54941e', 4969369552499069659);
insert into ta values (2, 'e7c90179', 1305347797378229715);
insert into ta values (9, '75e0344a', 500154046394880294);
insert into ta values (9, 'c3e8fc36', 5880042654284780409);
insert into ta values (6, 'd6835599', 2703142091339420770);
insert into ta values (5, 'c4a9c3a3', 6725275961959702206);
insert into ta values (14, 'eb1ab0dd', 5442878220607642694);
insert into ta values (7, '78e166f4', 7062852002089313920);
insert into ta values (8, '20986b65', 5485014514564267319);
insert into ta values (8, '9bd4d7a9', 9085469020413045798);
insert into ta values (15, 'd4aa9a8a', 546189610059969690);
insert into ta values (7, 'a7870c06', 3615729521258364152);
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
insert into ta values (111, 'bcf4e75f', 3304674741328415661);
insert into ta values (112, 'c08078e9', 7464585077725645791);
insert into ta values (113, 'ca05b4b2', 1280363363179468054);
insert into ta values (114, '8a094c96', 107578474892900608);
insert into ta values (115, 'f38efac2', 5273601814057696410);
insert into ta values (116, '5bf0cb56', 7276272767003446282);
insert into ta values (117, 'c8836b45', 653431702983792793);
insert into ta values (118, '7470ba67', 5617407618564683998);
insert into ta values (119, '466e1e95', 6827370124386922419);
insert into ta values (120, '41df97f3', 2296443172527920942);
insert into ta values (121, 'bd644f43', 6038622426427289955);
insert into ta values (122, '96aeb918', 1496857236328804363);
insert into ta values (123, '232448f7', 1199921720244646472);
insert into ta values (124, 'd296d6e4', 5705035255191089143);
insert into ta values (125, '194ec1d8', 6895413645725179445);
insert into ta values (126, 'a53238ec', 1527836891202149330);
62 changes: 62 additions & 0 deletions br/tests/lightning_duplicate_resolution_incremental/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/bash
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

check_cluster_version 5 2 0 'duplicate detection' || exit 0

LOG_FILE1="$TEST_DIR/lightning-duplicate-resolution1.log"
LOG_FILE2="$TEST_DIR/lightning-duplicate-resolution2.log"

# let lightning run a bit slow to avoid some table in the first lightning finish too fast.
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownCheckDupe=return(10)"
run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_incremental.sorted1" \
--enable-checkpoint=1 --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config1.toml" &

counter=0
while [ $counter -lt 10 ]; do
if grep -Fq "start to sleep several seconds before checking other dupe" "$LOG_FILE1"; then
echo "lightning 1 already starts waiting for dupe"
break
fi
((counter += 1))
echo "waiting for lightning 1 starts"
sleep 1
done

if [ $counter -ge 10 ]; then
echo "fail to wait for lightning 1 starts"
exit 1
fi

run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_incremental.sorted2" \
--enable-checkpoint=1 --log-file "$LOG_FILE2" --config "tests/$TEST_NAME/config2.toml" &

wait

export GO_FAILPOINTS=""

# Ensure table is consistent.
run_sql 'admin check table dup_resolve_detect.ta'

# Check data correctness
run_sql 'select count(*), sum(id) from dup_resolve_detect.ta where id < 100'
check_contains 'count(*): 10'
check_contains 'sum(id): 80'

run_sql 'select count(*), sum(id) from dup_resolve_detect.ta where id > 100'
check_contains 'count(*): 16'
check_contains 'sum(id): 1896'

0 comments on commit 7255868

Please sign in to comment.