Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: check peers write stall when switch-mode is disabled #40228

Merged
merged 11 commits into from
Dec 30, 2022
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ go_test(
"//br/pkg/lightning/glue",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/worker",
"//br/pkg/membuf",
"//br/pkg/mock",
"//br/pkg/pdutil",
Expand Down
17 changes: 11 additions & 6 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ import (
"github.com/stretchr/testify/require"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
dir := t.TempDir()
db, err := pebble.Open(path.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)
return db, tmpPath
}

func TestIngestSSTWithClosedEngine(t *testing.T) {
opt := &pebble.Options{
MemTableSize: 1024 * 1024,
MaxConcurrentCompactions: 16,
Expand All @@ -41,11 +50,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
DisableWAL: true,
ReadOnly: false,
}
db, err := pebble.Open(filepath.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)
db, tmpPath := makePebbleDB(t, opt)

_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
gRPCKeepAliveTime = 10 * time.Minute
gRPCKeepAliveTimeout = 5 * time.Minute
gRPCBackOffMaxDelay = 10 * time.Minute
writeStallSleepTime = 10 * time.Second

// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096
Expand Down Expand Up @@ -381,6 +382,12 @@ type local struct {

encBuilder backend.EncodingBuilder
targetInfoGetter backend.TargetInfoGetter

// When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall.
// To avoid this, we should check write stall before ingesting SSTs. Note that, we
// must check both leader node and followers in client side, because followers will
// not check write stall as long as ingest command is accepted by leader.
shouldCheckWriteStall bool
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
Expand Down Expand Up @@ -503,6 +510,7 @@ func NewLocalBackend(
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
targetInfoGetter: NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr),
shouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0,
}
if m, ok := metric.FromContext(ctx); ok {
local.metrics = m
Expand Down Expand Up @@ -1146,6 +1154,25 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
return resp, errors.Trace(err)
}

if local.shouldCheckWriteStall {
for {
maybeWriteStall, err := local.checkWriteStall(ctx, region)
if err != nil {
return nil, err
}
if !maybeWriteStall {
break
}
log.FromContext(ctx).Warn("ingest maybe cause write stall, sleep and retry",
zap.Duration("duration", writeStallSleepTime))
select {
case <-time.After(writeStallSleepTime):
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
}
}
}

req := &sst.MultiIngestRequest{
Context: reqCtx,
Ssts: metas,
Expand All @@ -1154,6 +1181,23 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp
return resp, errors.Trace(err)
}

func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, error) {
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer.StoreId)
if err != nil {
return false, errors.Trace(err)
}
resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
return false, errors.Trace(err)
}
if resp.Error != nil && resp.Error.ServerIsBusy != nil {
return true, nil
}
}
return false, nil
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit))
curSize := uint64(0)
Expand Down
Loading