Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into lightni…
Browse files Browse the repository at this point in the history
…ng-network
  • Loading branch information
buchuitoudegou committed Dec 8, 2022
2 parents d4f422d + c65a93a commit a0d2ad9
Show file tree
Hide file tree
Showing 88 changed files with 6,496 additions and 5,566 deletions.
16 changes: 12 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ def go_deps():
sum = "h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=",
version = "v0.3.4",
)
go_repository(
name = "com_github_cloudfoundry_gosigar",
build_file_proto_mode = "disable",
importpath = "github.com/cloudfoundry/gosigar",
sum = "h1:T3MoGdugg1vdHn8Az7wDn7cZ4+QCjZph+eXf2CjSjo4=",
version = "v1.3.4",
)

go_repository(
name = "com_github_cloudykit_fastprinter",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -3519,8 +3527,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:Nr2EhvqkOE9xFyU7LV9c9EbsgN3OzVALdbfobK7Fmn4=",
version = "v2.0.3-0.20221205084317-ad59ca833a78",
sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=",
version = "v2.0.3",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down Expand Up @@ -4438,8 +4446,8 @@ def go_deps():
name = "org_golang_x_sys",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/sys",
sum = "h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=",
version = "v0.2.0",
sum = "h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_term",
Expand Down
23 changes: 23 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,29 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
backupCol.Name, backupCol.FieldType.String())
}
}

if backupTi.Name.L == sysUserTableName {
// check whether the columns of table in cluster are less than the backup data
clusterColMap := make(map[string]*model.ColumnInfo)
for i := range ti.Columns {
col := ti.Columns[i]
clusterColMap[col.Name.L] = col
}
// order can be different
for i := range backupTi.Columns {
col := backupTi.Columns[i]
clusterCol := clusterColMap[col.Name.L]
if clusterCol == nil {
log.Error("missing column in cluster data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
return errors.Annotatef(berrors.ErrRestoreIncompatibleSys,
"missing column in cluster data, table: %s, col: %s %s",
table.Info.Name.O,
col.Name, col.FieldType.String())
}
}
}
}
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ func TestCheckSysTableCompatibility(t *testing.T) {
Info: mockedUserTI,
}})
require.NoError(t, err)
userTI.Columns = userTI.Columns[:len(userTI.Columns)-1]

// user table in cluster have less columns(failed)
mockedUserTI = userTI.Clone()
mockedUserTI.Columns = append(mockedUserTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")})
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedUserTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))

// column order mismatch(success)
mockedUserTI = userTI.Clone()
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,18 @@ func NewS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3St
)
}
c := s3.New(ses, s3CliConfigs...)
// s3manager.GetBucketRegionWithClient will set credential anonymous, which works with s3.
// we need reassign credential to be compatible with minio authentication.
confCred := ses.Config.Credentials
setCredOpt := func(req *request.Request) {
// s3manager.GetBucketRegionWithClient will set credential anonymous, which works with s3.
// we need reassign credential to be compatible with minio authentication.
if confCred != nil {
req.Config.Credentials = confCred
}
// s3manager.GetBucketRegionWithClient use path style addressing default.
// we need set S3ForcePathStyle by our config if we set endpoint.
if qs.Endpoint != "" {
req.Config.S3ForcePathStyle = ses.Config.S3ForcePathStyle
}
}
region, err := s3manager.GetBucketRegionWithClient(context.Background(), c, qs.Bucket, setCredOpt)
if err != nil {
Expand Down
51 changes: 29 additions & 22 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,32 +314,35 @@ func TestS3Storage(t *testing.T) {
{
name: "no region",
s3: &backuppb.S3{
Region: "",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
Region: "",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
ForcePathStyle: true,
},
errReturn: false,
sendCredential: true,
},
{
name: "wrong region",
s3: &backuppb.S3{
Region: "us-east-2",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
Region: "us-east-2",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
ForcePathStyle: true,
},
errReturn: true,
sendCredential: true,
},
{
name: "right region",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
Region: "us-west-2",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
ForcePathStyle: true,
},
errReturn: false,
sendCredential: true,
Expand All @@ -353,6 +356,7 @@ func TestS3Storage(t *testing.T) {
SecretAccessKey: "cd",
Bucket: "bucket",
Prefix: "prefix",
ForcePathStyle: true,
},
errReturn: false,
sendCredential: true,
Expand All @@ -365,30 +369,33 @@ func TestS3Storage(t *testing.T) {
SecretAccessKey: "cd",
Bucket: "bucket",
Prefix: "prefix",
ForcePathStyle: true,
},
errReturn: false,
sendCredential: true,
},
{
name: "no secret access key",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: s.URL,
AccessKey: "ab",
Bucket: "bucket",
Prefix: "prefix",
Region: "us-west-2",
Endpoint: s.URL,
AccessKey: "ab",
Bucket: "bucket",
Prefix: "prefix",
ForcePathStyle: true,
},
errReturn: false,
sendCredential: true,
},
{
name: "no secret access key",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: s.URL,
AccessKey: "ab",
Bucket: "bucket",
Prefix: "prefix",
Region: "us-west-2",
Endpoint: s.URL,
AccessKey: "ab",
Bucket: "bucket",
Prefix: "prefix",
ForcePathStyle: true,
},
errReturn: false,
sendCredential: false,
Expand Down
1 change: 1 addition & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_test(
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/spans",
"//br/pkg/utils",
"//kv",
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,17 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()

threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(ctx); err != nil {
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(ctx, func(ctx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(ctx, threshold)
err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -219,3 +222,36 @@ func TestTaskRangesWithSplit(t *testing.T) {
shouldFinishInTime(t, 10*time.Second, "second advancing", func() { require.NoError(t, adv.OnTick(ctx)) })
require.Greater(t, env.getCheckpoint(), fstCheckpoint)
}

func TestBlocked(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
ctx := context.Background()
req := require.New(t)
c.splitAndScatter("0012", "0034", "0048")
marked := false
for _, s := range c.stores {
s.clientMu.Lock()
s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error {
// blocking the thread.
// this may happen when TiKV goes down or too busy.
<-(chan struct{})(nil)
return nil
}
s.clientMu.Unlock()
marked = true
}
req.True(marked, "failed to mark the cluster: ")
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(c *config.Config) {
// ... So the tick timeout would be 100ms
c.TickDuration = 10 * time.Millisecond
})
var err error
shouldFinishInTime(t, time.Second, "ticking", func() {
err = adv.OnTick(ctx)
})
req.ErrorIs(errors.Cause(err), context.DeadlineExceeded)
}
17 changes: 12 additions & 5 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ type fakeStore struct {
id uint64
regions map[uint64]*region

clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
onGetRegionCheckpoint func(*logbackup.GetLastFlushTSOfRegionRequest) error
}

type fakeCluster struct {
Expand Down Expand Up @@ -133,7 +134,7 @@ func (t trivialFlushStream) Recv() (*logbackup.SubscribeFlushEventResponse, erro
return &item, nil
default:
}
return nil, t.cx.Err()
return nil, status.Error(codes.Canceled, t.cx.Err().Error())
}
}

Expand Down Expand Up @@ -184,6 +185,12 @@ func (f *fakeStore) SetSupportFlushSub(b bool) {
}

func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) {
if f.onGetRegionCheckpoint != nil {
err := f.onGetRegionCheckpoint(in)
if err != nil {
return nil, err
}
}
resp := &logbackup.GetLastFlushTSOfRegionResponse{
Checkpoints: []*logbackup.RegionCheckpoint{},
}
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold / 5
}

// TickTimeout returns the max duration for each tick.
func (conf Config) TickTimeout() time.Duration {
// If a tick blocks 10x the interval of ticking, we may need to break it and retry.
return 10 * conf.TickDuration
}
10 changes: 6 additions & 4 deletions br/pkg/streamhelper/flush_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,22 @@ func (s *subscription) connect(ctx context.Context, dialer LogBackupService) {

func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) error {
log.Info("[log backup subscription manager] Adding subscription.", zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt))
s.clearError()
// We should shutdown the background task firstly.
// Once it yields some error during shuting down, the error won't be brought to next run.
s.close()
s.clearError()

c, err := dialer.GetLogBackupClient(ctx, s.storeID)
if err != nil {
return err
return errors.Annotate(err, "failed to get log backup client")
}
cx, cancel := context.WithCancel(ctx)
cli, err := c.SubscribeFlushEvent(cx, &logbackup.SubscribeFlushEventRequest{
ClientId: uuid.NewString(),
})
if err != nil {
cancel()
return err
return errors.Annotate(err, "failed to subscribe events")
}
s.cancel = cancel
s.background = spawnJoinable(func() { s.listenOver(cli) })
Expand All @@ -249,7 +251,7 @@ func (s *subscription) listenOver(cli eventStream) {
msg, err := cli.Recv()
if err != nil {
log.Info("[log backup flush subscriber] Listen stopped.", zap.Uint64("store", storeID), logutil.ShortError(err))
if err == io.EOF || err == context.Canceled {
if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled {
return
}
s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID))
Expand Down
Loading

0 comments on commit a0d2ad9

Please sign in to comment.