Skip to content

Commit

Permalink
xds: Outlier Detection configuration in Cluster Resolver Balancer (#5371
Browse files Browse the repository at this point in the history
)

xds: Outlier Detection configuration in Cluster Resolver Balancer
  • Loading branch information
zasweq authored Jun 17, 2022
1 parent 1dabf54 commit 29d9970
Show file tree
Hide file tree
Showing 10 changed files with 816 additions and 29 deletions.
16 changes: 16 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ var (
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()

// RegisterOutlierDetectionBalancerForTesting registers the Outlier
// Detection Balancer for testing purposes, regardless of the Outlier
// Detection environment variable.
//
// TODO: Remove this function once the Outlier Detection env var is removed.
RegisterOutlierDetectionBalancerForTesting func()

// UnregisterOutlierDetectionBalancerForTesting unregisters the Outlier
// Detection Balancer for testing purposes. This is needed because there is
// no way to unregister the Outlier Detection Balancer after registering it
// solely for testing purposes using
// RegisterOutlierDetectionBalancerForTesting().
//
// TODO: Remove this function once the Outlier Detection env var is removed.
UnregisterOutlierDetectionBalancerForTesting func()
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc
return provider, nil
}

func outlierDetectionToConfig(od *xdsresource.OutlierDetection) *outlierdetection.LBConfig { // Already validated - no need to return error
func outlierDetectionToConfig(od *xdsresource.OutlierDetection) outlierdetection.LBConfig { // Already validated - no need to return error
if od == nil {
// "If the outlier_detection field is not set in the Cluster message, a
// "no-op" outlier_detection config will be generated, with interval set
// to the maximum possible value and all other fields unset." - A50
return &outlierdetection.LBConfig{
return outlierdetection.LBConfig{
Interval: 1<<63 - 1,
}
}
Expand Down Expand Up @@ -308,7 +308,7 @@ func outlierDetectionToConfig(od *xdsresource.OutlierDetection) *outlierdetectio
}
}

return &outlierdetection.LBConfig{
return outlierdetection.LBConfig{
Interval: od.Interval,
BaseEjectionTime: od.BaseEjectionTime,
MaxEjectionTime: od.MaxEjectionTime,
Expand Down
15 changes: 8 additions & 7 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
Expand Down Expand Up @@ -250,7 +251,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -306,7 +307,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
// newChildBalancer function as part of test setup. No security config is
// passed to the CDS balancer as part of this update.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -462,7 +463,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -496,7 +497,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -549,7 +550,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -599,7 +600,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -677,7 +678,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
SubjectAltNameMatchers: testSANMatchers,
},
}
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down
26 changes: 13 additions & 13 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var (
ServerURI: "self_server",
CredsType: "self_creds",
}
noopODLBCfg = &outlierdetection.LBConfig{
noopODLBCfg = outlierdetection.LBConfig{
Interval: 1<<63 - 1,
}
)
Expand Down Expand Up @@ -215,7 +215,7 @@ func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState {

// edsCCS is a helper function to construct a good update passed from the
// cdsBalancer to the edsBalancer.
func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy *internalserviceconfig.BalancerConfig, odConfig *outlierdetection.LBConfig) balancer.ClientConnState {
func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy *internalserviceconfig.BalancerConfig, odConfig outlierdetection.LBConfig) balancer.ClientConnState {
discoveryMechanism := clusterresolver.DiscoveryMechanism{
Type: clusterresolver.DiscoveryMechanismTypeEDS,
Cluster: service,
Expand Down Expand Up @@ -421,7 +421,7 @@ func (s) TestHandleClusterUpdate(t *testing.T) {
FailurePercentageMinimumHosts: 5,
FailurePercentageRequestVolume: 50,
}},
wantCCS: edsCCS(serviceName, nil, false, nil, &outlierdetection.LBConfig{
wantCCS: edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
Expand Down Expand Up @@ -506,7 +506,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -591,7 +591,7 @@ func (s) TestResolverError(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -640,7 +640,7 @@ func (s) TestUpdateSubConnState(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -675,7 +675,7 @@ func (s) TestCircuitBreaking(t *testing.T) {
// the service's counter with the new max requests.
var maxRequests uint32 = 1
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests}
wantCCS := edsCCS(clusterName, &maxRequests, false, nil, nil)
wantCCS := edsCCS(clusterName, &maxRequests, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -708,7 +708,7 @@ func (s) TestClose(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -779,7 +779,7 @@ func (s) TestExitIdle(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -846,7 +846,7 @@ func (s) TestOutlierDetectionToConfig(t *testing.T) {
tests := []struct {
name string
od *xdsresource.OutlierDetection
odLBCfgWant *outlierdetection.LBConfig
odLBCfgWant outlierdetection.LBConfig
}{
// "if the outlier_detection field is not set in the Cluster resource,
// a "no-op" outlier_detection config will be generated in the
Expand Down Expand Up @@ -876,7 +876,7 @@ func (s) TestOutlierDetectionToConfig(t *testing.T) {
FailurePercentageMinimumHosts: 5,
FailurePercentageRequestVolume: 50,
},
odLBCfgWant: &outlierdetection.LBConfig{
odLBCfgWant: outlierdetection.LBConfig{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
Expand Down Expand Up @@ -909,7 +909,7 @@ func (s) TestOutlierDetectionToConfig(t *testing.T) {
FailurePercentageMinimumHosts: 5,
FailurePercentageRequestVolume: 50,
},
odLBCfgWant: &outlierdetection.LBConfig{
odLBCfgWant: outlierdetection.LBConfig{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
Expand Down Expand Up @@ -939,7 +939,7 @@ func (s) TestOutlierDetectionToConfig(t *testing.T) {
FailurePercentageMinimumHosts: 5,
FailurePercentageRequestVolume: 50,
},
odLBCfgWant: &outlierdetection.LBConfig{
odLBCfgWant: outlierdetection.LBConfig{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
Expand Down
123 changes: 121 additions & 2 deletions xds/internal/balancer/clusterresolver/clusterresolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,21 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
Expand All @@ -53,7 +62,7 @@ var (
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: "endpoint1"}},
ID: internal.LocalityID{Zone: "zone"},
ID: xdsinternal.LocalityID{Zone: "zone"},
Priority: 1,
Weight: 100,
},
Expand Down Expand Up @@ -130,6 +139,18 @@ func (f *fakeChildBalancer) Close() {}

func (f *fakeChildBalancer) ExitIdle() {}

func (f *fakeChildBalancer) waitForClientConnStateChangeVerifyBalancerConfig(ctx context.Context, wantCCS balancer.ClientConnState) error {
ccs, err := f.clientConnState.Receive(ctx)
if err != nil {
return err
}
gotCCS := ccs.(balancer.ClientConnState)
if diff := cmp.Diff(gotCCS, wantCCS, cmpopts.IgnoreFields(balancer.ClientConnState{}, "ResolverState")); diff != "" {
return fmt.Errorf("received unexpected ClientConnState, diff (-got +want): %v", diff)
}
return nil
}

func (f *fakeChildBalancer) waitForClientConnStateChange(ctx context.Context) error {
_, err := f.clientConnState.Receive(ctx)
if err != nil {
Expand Down Expand Up @@ -500,3 +521,101 @@ func newLBConfigWithOneEDS(edsServiceName string) *LBConfig {
}},
}
}

func newLBConfigWithOneEDSAndOutlierDetection(edsServiceName string, odCfg outlierdetection.LBConfig) *LBConfig {
lbCfg := newLBConfigWithOneEDS(edsServiceName)
lbCfg.DiscoveryMechanisms[0].OutlierDetection = odCfg
return lbCfg
}

// TestOutlierDetection tests the Balancer Config sent down to the child
// priority balancer when Outlier Detection is turned on. The Priority
// Configuration sent downward should have a top level Outlier Detection Policy
// for each priority.
func (s) TestOutlierDetection(t *testing.T) {
oldOutlierDetection := envconfig.XDSOutlierDetection
envconfig.XDSOutlierDetection = true
internal.RegisterOutlierDetectionBalancerForTesting()
defer func() {
envconfig.XDSOutlierDetection = oldOutlierDetection
}()

edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Update Cluster Resolver with Client Conn State with Outlier Detection
// configuration present. This is what will be passed down to this balancer,
// as CDS Balancer gets the Cluster Update and converts the Outlier
// Detection data to an Outlier Detection configuration and sends it to this
// level.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDSAndOutlierDetection(testEDSServcie, noopODCfg),
}); err != nil {
t.Fatal(err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}

// Invoke EDS Callback - causes child balancer to be built and then
// UpdateClientConnState called on it with Outlier Detection as a direct
// child.
xdsC.InvokeWatchEDSCallback("", defaultEndpointsUpdate, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}

localityID := xdsinternal.LocalityID{Zone: "zone"}
// The priority configuration generated should have Outlier Detection as a
// direct child due to Outlier Detection being turned on.
pCfgWant := &priority.LBConfig{
Children: map[string]*priority.Child{
"priority-0-0": {
Config: &internalserviceconfig.BalancerConfig{
Name: outlierdetection.Name,
Config: &outlierdetection.LBConfig{
Interval: 1<<63 - 1,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: clusterimpl.Name,
Config: &clusterimpl.LBConfig{
Cluster: testClusterName,
EDSServiceName: "test-eds-service-name",
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedtarget.Name,
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(localityID.ToString): {
Weight: 100,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
},
},
},
},
},
IgnoreReresolutionRequests: true,
},
},
Priorities: []string{"priority-0-0"},
}

if err := edsLB.waitForClientConnStateChangeVerifyBalancerConfig(ctx, balancer.ClientConnState{
BalancerConfig: pCfgWant,
}); err != nil {
t.Fatalf("EDS impl got unexpected update: %v", err)
}
}
Loading

0 comments on commit 29d9970

Please sign in to comment.