Skip to content

Commit

Permalink
pkg/lightning : remove get_regions call in physical backend (#46202) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 23, 2023
1 parent bfd8c74 commit c07735c
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 36 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
needSplit = true
})
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, needSplit, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
}
Expand Down
34 changes: 2 additions & 32 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -67,17 +66,15 @@ var (
func (local *local) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
if err := local.SplitAndScatterRegionByRanges(ctx, batch, needSplit); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -91,26 +88,20 @@ func (local *local) SplitAndScatterRegionInBatches(
func (local *local) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
) error {
if len(ranges) == 0 {
return nil
}

db, err := local.g.GetDB()
if err != nil {
return errors.Trace(err)
}
var err error

minKey := codec.EncodeBytes([]byte{}, ranges[0].start)
maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].end)

scatterRegions := make([]*split.RegionInfo, 0)
var retryKeys [][]byte
waitTime := splitRegionBaseBackOffTime
skippedKeys := 0
for i := 0; i < splitRetryTimes; i++ {
log.FromContext(ctx).Info("split and scatter region",
logutil.Key("minKey", minKey),
Expand Down Expand Up @@ -172,16 +163,6 @@ func (local *local) SplitAndScatterRegionByRanges(
return nil
}

var tableRegionStats map[uint64]int64
if tableInfo != nil {
tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID)
if err != nil {
log.FromContext(ctx).Warn("fetch table region size statistics failed",
zap.String("table", tableInfo.Name), zap.Error(err))
tableRegionStats, err = make(map[uint64]int64), nil
}
}

regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
Expand Down Expand Up @@ -291,15 +272,6 @@ func (local *local) SplitAndScatterRegionByRanges(
}
sendLoop:
for regionID, keys := range splitKeyMap {
// if region not in tableRegionStats, that means this region is newly split, so
// we can skip split it again.
regionSize, ok := tableRegionStats[regionID]
if !ok {
log.FromContext(ctx).Warn("region stats not found", zap.Uint64("region", regionID))
}
if len(keys) == 1 && regionSize < regionSplitSize {
skippedKeys++
}
select {
case ch <- &splitInfo{region: regionMap[regionID], keys: keys}:
case <-ctx.Done():
Expand Down Expand Up @@ -335,11 +307,9 @@ func (local *local) SplitAndScatterRegionByRanges(
scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions)
if scatterCount == len(scatterRegions) {
log.FromContext(ctx).Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.FromContext(ctx).Info("waiting for scattering regions timeout",
zap.Int("skipped_keys", skippedKeys),
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)),
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie
split.ScanRegionAttemptTimes = backup
}()
}
err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
err = local.SplitAndScatterRegionByRanges(ctx, ranges, true)
if len(errPat) == 0 {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -650,7 +650,7 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) {
})
}

err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
err := local.SplitAndScatterRegionInBatches(ctx, ranges, true, 4)
require.NoError(t, err)

rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
Expand Down Expand Up @@ -747,7 +747,7 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) {
start = e
}

err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
err := local.SplitAndScatterRegionByRanges(ctx, ranges, true)
require.NoError(t, err)

startKey := codec.EncodeBytes([]byte{}, rangeKeys[0])
Expand Down

0 comments on commit c07735c

Please sign in to comment.