Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: NACK endpoint resources with zero weight #5560

Merged
merged 1 commit into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}
return &v3endpointpb.ClusterLoadAssignment{
Expand Down
3 changes: 3 additions & 0 deletions xds/internal/testutils/protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uin
lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]}
}
}
if lbe.LoadBalancingWeight == nil {
lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 1}
}
lbEndPoints = append(lbEndPoints, lbe)
}

Expand Down
6 changes: 3 additions & 3 deletions xds/internal/xdsclient/controller/v2_eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s) TestEDSHandleResponse(t *testing.T) {
"not-goodEDSName": {Update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: "addr1:314"}},
Endpoints: []xdsresource.Endpoint{{Address: "addr1:314", Weight: 1}},
ID: internal.LocalityID{SubZone: "locality-1"},
Priority: 0,
Weight: 1,
Expand All @@ -140,13 +140,13 @@ func (s) TestEDSHandleResponse(t *testing.T) {
goodEDSName: {Update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: "addr1:314"}},
Endpoints: []xdsresource.Endpoint{{Address: "addr1:314", Weight: 1}},
ID: internal.LocalityID{SubZone: "locality-1"},
Priority: 1,
Weight: 1,
},
{
Endpoints: []xdsresource.Endpoint{{Address: "addr2:159"}},
Endpoints: []xdsresource.Endpoint{{Address: "addr2:159", Weight: 1}},
ID: internal.LocalityID{SubZone: "locality-2"},
Priority: 0,
Weight: 1,
Expand Down
16 changes: 12 additions & 4 deletions xds/internal/xdsclient/xdsresource/unmarshal_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,20 @@ func parseDropPolicy(dropPolicy *v3endpointpb.ClusterLoadAssignment_Policy_DropO
}
}

func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) []Endpoint {
func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) ([]Endpoint, error) {
endpoints := make([]Endpoint, 0, len(lbEndpoints))
for _, lbEndpoint := range lbEndpoints {
weight := lbEndpoint.GetLoadBalancingWeight().GetValue()
if weight == 0 {
return nil, fmt.Errorf("EDS response contains an endpoint with zero weight: %+v", lbEndpoint)
}
endpoints = append(endpoints, Endpoint{
HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()),
Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(),
Weight: weight,
})
}
return endpoints
return endpoints, nil
}

func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.PrefixLogger) (EndpointsUpdate, error) {
Expand Down Expand Up @@ -134,9 +138,13 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.Pr
return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority)
}
localitiesWithPriority[lidStr] = true
endpoints, err := parseEndpoints(locality.GetLbEndpoints())
if err != nil {
return EndpointsUpdate{}, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap this err with more information (see other errors in this function).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseEndpoints returns an error which has all the required information.

}
ret.Localities = append(ret.Localities, Locality{
ID: lid,
Endpoints: parseEndpoints(locality.GetLbEndpoints()),
Endpoints: endpoints,
Weight: locality.GetLoadBalancingWeight().GetValue(),
Priority: priority,
})
Expand Down
10 changes: 10 additions & 0 deletions xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ func (s) TestEDSParseRespProto(t *testing.T) {
want: EndpointsUpdate{},
wantErr: true,
},
{
name: "zero-endpoint-weight",
m: func() *v3endpointpb.ClusterLoadAssignment {
clab0 := newClaBuilder("test", nil)
clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, &addLocalityOptions{Weight: []uint32{0}})
return clab0.Build()
}(),
want: EndpointsUpdate{},
wantErr: true,
},
{
name: "duplicate-locality-in-the-same-priority",
m: func() *v3endpointpb.ClusterLoadAssignment {
Expand Down