Skip to content

Commit

Permalink
*: upgrade the PD client to adopt the latest refactor (#58440)
Browse files Browse the repository at this point in the history
ref #58439
  • Loading branch information
JmPotato authored Dec 23, 2024
1 parent 042a332 commit 14a469a
Show file tree
Hide file tree
Showing 82 changed files with 326 additions and 202 deletions.
36 changes: 18 additions & 18 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5763,13 +5763,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "7d3b6f6b755b027ba138d3069238f4a4e91d0d1f573de17cda00985616adc843",
strip_prefix = "github.com/pingcap/[email protected]20241120022153-92b0414aeed8",
sha256 = "92a67bcc499c06fd3d76cc153362540b22eaf1b09c4bda62a1599ce876b8ed78",
strip_prefix = "github.com/pingcap/[email protected]20241120071417-b5b7843d9037",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip",
],
)
go_repository(
Expand Down Expand Up @@ -6933,26 +6933,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "844684ee6ae7decc5cadcab3f95c526b66878f8401c71cf82af68ec0cc5257d5",
strip_prefix = "github.com/tikv/client-go/[email protected].20241209094930-06d7f4b9233b",
sha256 = "620533c3da6f02758df6e61310c72af9761e6f9ce7244e42962bbe882f5b4317",
strip_prefix = "github.com/tikv/client-go/[email protected].20241223070848-fd950fcf9fcc",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241209094930-06d7f4b9233b.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241209094930-06d7f4b9233b.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241209094930-06d7f4b9233b.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241209094930-06d7f4b9233b.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241223070848-fd950fcf9fcc.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241223070848-fd950fcf9fcc.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241223070848-fd950fcf9fcc.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241223070848-fd950fcf9fcc.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "52a62b6f6247ce31ee9d0a5dbde941ba3be3db74a713fd79643d015d98a15c5f",
strip_prefix = "github.com/tikv/pd/[email protected]20241111073742-238d4d79ea31",
sha256 = "e46a55f684df2acf06f1081a8a32750ef99bc05e9a1d6508f5c8e605bf954642",
strip_prefix = "github.com/tikv/pd/[email protected]20241220053006-461b86adc78d",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241220053006-461b86adc78d.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241220053006-461b86adc78d.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241220053006-461b86adc78d.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241220053006-461b86adc78d.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//opt",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -103,7 +103,7 @@ type CliEnv struct {
}

func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := c.Cache.PDClient().GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/conn/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ go_library(
"//pkg/util/engine",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//opt",
],
)
6 changes: 3 additions & 3 deletions br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/util/engine"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
)

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
Expand All @@ -34,7 +34,7 @@ type StoreMeta interface {
// GetAllStores gets all stores from pd.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error)
GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error)
}

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
Expand All @@ -45,7 +45,7 @@ func GetAllTiKVStores(
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := pdClient.GetAllStores(ctx, opt.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/pdutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ go_library(
"@com_github_pingcap_log//:log",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@com_github_tikv_pd_client//retry",
"@com_github_tikv_pd_client//opt",
"@com_github_tikv_pd_client//pkg/caller",
"@com_github_tikv_pd_client//pkg/retry",
"@org_golang_google_grpc//:grpc",
"@org_uber_go_zap//:zap",
],
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/retry"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -154,11 +156,11 @@ func NewPdController(
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
}
pdClient, err := pd.NewClientWithContext(
ctx, pdAddrs, securityOption,
pd.WithGRPCDialOptions(maxCallMsgSize...),
ctx, caller.Component("br-pd-controller"), pdAddrs, securityOption,
opt.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
// the interface `ScatterRegions` may time out.
pd.WithCustomTimeoutOption(60*time.Second),
opt.WithCustomTimeoutOption(60*time.Second),
)
if err != nil {
log.Error("fail to create pd client", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/internal/rawkv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//rawkv",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//opt",
],
)

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/internal/rawkv/rawkv_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
)

// RawkvClient is the interface for rawkv.client
Expand All @@ -29,7 +29,7 @@ func NewRawkvClient(ctx context.Context, pdAddrs []string, security config.Secur
ctx,
pdAddrs,
security,
pd.WithCustomTimeoutOption(10*time.Second))
opt.WithCustomTimeoutOption(10*time.Second))
}

type KVPair struct {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//clients/router",
"@org_golang_x_exp//slices",
"@org_uber_go_goleak//:goleak",
],
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/snap_client/placement_rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
)

func generateTables() []*snapclient.CreatedTable {
Expand Down Expand Up @@ -104,8 +104,8 @@ func TestContextManagerOnlineNoStores(t *testing.T) {
require.NoError(t, err)
}

func generateRegions() []*pd.Region {
return []*pd.Region{
func generateRegions() []*router.Region {
return []*router.Region{
{
Meta: &metapb.Region{
Id: 0,
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ go_library(
"@com_github_pingcap_kvproto//pkg/tikvpb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//clients/router",
"@com_github_tikv_pd_client//http",
"@com_github_tikv_pd_client//opt",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/util/intest"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/opt"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -210,7 +211,7 @@ func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionIn
logutil.Key("end", v.Region.EndKey),
zap.Uint64("id", v.Region.Id))
}
resp, err := c.client.ScatterRegions(ctx, regionsID, pd.WithSkipStoreLimit())
resp, err := c.client.ScatterRegions(ctx, regionsID, opt.WithSkipStoreLimit())
if err != nil {
return err
}
Expand Down
48 changes: 25 additions & 23 deletions br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/opt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -248,8 +250,8 @@ func (c *MockPDClientForSplit) ScanRegions(
_ context.Context,
key, endKey []byte,
limit int,
_ ...pd.GetRegionOption,
) ([]*pd.Region, error) {
_ ...opt.GetRegionOption,
) ([]*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -264,9 +266,9 @@ func (c *MockPDClientForSplit) ScanRegions(
}

regions := c.Regions.ScanRange(key, endKey, limit)
ret := make([]*pd.Region, 0, len(regions))
ret := make([]*router.Region, 0, len(regions))
for _, r := range regions {
ret = append(ret, &pd.Region{
ret = append(ret, &router.Region{
Meta: r.Meta,
Leader: r.Leader,
})
Expand All @@ -276,10 +278,10 @@ func (c *MockPDClientForSplit) ScanRegions(

func (c *MockPDClientForSplit) BatchScanRegions(
_ context.Context,
keyRanges []pd.KeyRange,
keyRanges []router.KeyRange,
limit int,
_ ...pd.GetRegionOption,
) ([]*pd.Region, error) {
_ ...opt.GetRegionOption,
) ([]*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -293,7 +295,7 @@ func (c *MockPDClientForSplit) BatchScanRegions(
c.scanRegions.beforeHook()
}

regions := make([]*pd.Region, 0, len(keyRanges))
regions := make([]*router.Region, 0, len(keyRanges))
var lastRegion *pdtypes.Region
for _, keyRange := range keyRanges {
if lastRegion != nil {
Expand All @@ -307,7 +309,7 @@ func (c *MockPDClientForSplit) BatchScanRegions(
rs := c.Regions.ScanRange(keyRange.StartKey, keyRange.EndKey, limit)
for _, r := range rs {
lastRegion = r
regions = append(regions, &pd.Region{
regions = append(regions, &router.Region{
Meta: r.Meta,
Leader: r.Leader,
})
Expand All @@ -316,13 +318,13 @@ func (c *MockPDClientForSplit) BatchScanRegions(
return regions, nil
}

func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error) {
func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...opt.GetRegionOption) (*router.Region, error) {
c.mu.Lock()
defer c.mu.Unlock()

for _, r := range c.Regions.Regions {
if r.Meta.Id == regionID {
return &pd.Region{
return &router.Region{
Meta: r.Meta,
Leader: r.Leader,
}, nil
Expand Down Expand Up @@ -370,7 +372,7 @@ func (c *MockPDClientForSplit) ScatterRegion(_ context.Context, regionID uint64)
return newRegionNotFullyReplicatedErr(regionID)
}

func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -517,7 +519,7 @@ func (fpdh *FakePDHTTPClient) DeletePlacementRule(_ context.Context, groupID str
type FakePDClient struct {
pd.Client
stores []*metapb.Store
regions []*pd.Region
regions []*router.Region

notLeader bool
retryTimes *int
Expand All @@ -540,21 +542,21 @@ func NewFakePDClient(stores []*metapb.Store, notLeader bool, retryTime *int) *Fa
}
}

func (fpdc *FakePDClient) SetRegions(regions []*pd.Region) {
func (fpdc *FakePDClient) SetRegions(regions []*router.Region) {
fpdc.regions = regions
}

func (fpdc *FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
func (fpdc *FakePDClient) GetAllStores(context.Context, ...opt.GetStoreOption) ([]*metapb.Store, error) {
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (fpdc *FakePDClient) ScanRegions(
ctx context.Context,
key, endKey []byte,
limit int,
opts ...pd.GetRegionOption,
) ([]*pd.Region, error) {
regions := make([]*pd.Region, 0, len(fpdc.regions))
opts ...opt.GetRegionOption,
) ([]*router.Region, error) {
regions := make([]*router.Region, 0, len(fpdc.regions))
fpdc.peerStoreId = fpdc.peerStoreId + 1
peerStoreId := (fpdc.peerStoreId + 1) / 2
for _, region := range fpdc.regions {
Expand All @@ -572,11 +574,11 @@ func (fpdc *FakePDClient) ScanRegions(

func (fpdc *FakePDClient) BatchScanRegions(
ctx context.Context,
ranges []pd.KeyRange,
ranges []router.KeyRange,
limit int,
opts ...pd.GetRegionOption,
) ([]*pd.Region, error) {
regions := make([]*pd.Region, 0, len(fpdc.regions))
opts ...opt.GetRegionOption,
) ([]*router.Region, error) {
regions := make([]*router.Region, 0, len(fpdc.regions))
fpdc.peerStoreId = fpdc.peerStoreId + 1
peerStoreId := (fpdc.peerStoreId + 1) / 2
for _, region := range fpdc.regions {
Expand Down Expand Up @@ -633,7 +635,7 @@ func (f *FakeSplitClient) AppendRegion(startKey, endKey []byte) {
})
}

func (f *FakeSplitClient) AppendPdRegion(region *pd.Region) {
func (f *FakeSplitClient) AppendPdRegion(region *router.Region) {
f.regions = append(f.regions, &RegionInfo{
Region: region.Meta,
Leader: region.Leader,
Expand Down
Loading

0 comments on commit 14a469a

Please sign in to comment.