Skip to content

Commit

Permalink
ringhash: handle config updates properly (#5557)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Aug 4, 2022
1 parent 946dde0 commit f9409d3
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 45 deletions.
34 changes: 16 additions & 18 deletions xds/internal/balancer/ringhash/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package ringhash

import (
"fmt"
"math"
"sort"
"strconv"
Expand Down Expand Up @@ -64,12 +63,12 @@ type ringEntry struct {
//
// To pick from a ring, a binary search will be done for the given target hash,
// and first item with hash >= given hash will be returned.
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) {
//
// Must be called with a non-empty subConns map.
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) *ring {
// https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114
normalizedWeights, minWeight, err := normalizeWeights(subConns)
if err != nil {
return nil, err
}
normalizedWeights, minWeight := normalizeWeights(subConns)

// Normalized weights for {3,3,4} is {0.3,0.3,0.4}.

// Scale up the size of the ring such that the least-weighted host gets a
Expand Down Expand Up @@ -106,30 +105,29 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*r
for i, ii := range items {
ii.idx = i
}
return &ring{items: items}, nil
return &ring{items: items}
}

// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) {
keys := subConns.Keys()
if len(keys) == 0 {
return nil, 0, fmt.Errorf("number of subconns is 0")
}
//
// Must be called with a non-empty subConns map.
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) {
var weightSum uint32
keys := subConns.Keys()
for _, a := range keys {
weightSum += getWeightAttribute(a)
}
if weightSum == 0 {
return nil, 0, fmt.Errorf("total weight of all subconns is 0")
}
weightSumF := float64(weightSum)
ret := make([]subConnWithWeight, 0, len(keys))
min := float64(1.0)
for _, a := range keys {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
nw := float64(getWeightAttribute(a)) / weightSumF
// getWeightAttribute() returns 1 if the weight attribute is not found
// on the address. And since this function is guaranteed to be called
// with a non-empty subConns map, weightSum is guaranteed to be
// non-zero. So, we need not worry about divide a by zero error here.
nw := float64(getWeightAttribute(a)) / float64(weightSum)
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
Expand All @@ -142,7 +140,7 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float
// where an address is added and then removed, the RPCs will still pick the
// same old SubConn.
sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr })
return ret, min, nil
return ret, min
}

// pick does a binary search. It returns the item with smallest index i that
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s) TestRingNew(t *testing.T) {
for _, min := range []uint64{3, 4, 6, 8} {
for _, max := range []uint64{20, 8} {
t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) {
r, _ := newRing(testSubConnMap, min, max)
r := newRing(testSubConnMap, min, max)
totalCount := len(r.items)
if totalCount < int(min) || totalCount > int(max) {
t.Fatalf("unexpect size %v, want min %v, max %v", totalCount, min, max)
Expand Down Expand Up @@ -82,7 +82,7 @@ func equalApproximately(x, y float64) bool {
}

func (s) TestRingPick(t *testing.T) {
r, _ := newRing(testSubConnMap, 10, 20)
r := newRing(testSubConnMap, 10, 20)
for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} {
t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) {
e := r.pick(h)
Expand All @@ -100,7 +100,7 @@ func (s) TestRingPick(t *testing.T) {
}

func (s) TestRingNext(t *testing.T) {
r, _ := newRing(testSubConnMap, 10, 20)
r := newRing(testSubConnMap, 10, 20)

for _, e := range r.items {
ne := r.next(e)
Expand Down
44 changes: 24 additions & 20 deletions xds/internal/balancer/ringhash/ringhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,29 +259,22 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {

func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
if b.config == nil {
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
b.config = newConfig
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}

// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
if b.updateAddresses(s.ResolverState.Addresses) {
// If addresses were updated, no matter whether it resulted in SubConn
// creation/deletion, or just weight update, we will need to regenerate
// the ring.
var err error
b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
if err != nil {
b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err))
return balancer.ErrBadResolverState
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
// If addresses were updated, whether it resulted in SubConn
// creation/deletion, or just weight update, we need to regenerate the ring
// and send a new picker.
regenerateRing := b.updateAddresses(s.ResolverState.Addresses)

// If the ring configuration has changed, we need to regenerate the ring and
// send a new picker.
if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize {
regenerateRing = true
}
b.config = newConfig

// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
Expand All @@ -291,6 +284,17 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}

if regenerateRing {
// Ring creation is guaranteed to not fail because we call newRing()
// with a non-empty subConns map.
b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

// Successful resolution; clear resolver error and return nil.
b.resolverErr = nil
return nil
}

Expand Down
42 changes: 38 additions & 4 deletions xds/internal/balancer/ringhash/ringhash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,40 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestUpdateClientConnState_NewRingSize tests the scenario where the ringhash
// LB policy receives new configuration which specifies new values for the ring
// min and max sizes. The test verifies that a new ring is created and a new
// picker is sent to the ClientConn.
func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) {
origMinRingSize, origMaxRingSize := 1, 10 // Configured from `testConfig` in `setupTest`
newMinRingSize, newMaxRingSize := 20, 100

addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}}
cc, b, p1 := setupTest(t, addrs)
ring1 := p1.(*picker).ring
if ringSize := len(ring1.items); ringSize < origMinRingSize || ringSize > origMaxRingSize {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, origMinRingSize, origMaxRingSize)
}

if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &LBConfig{MinRingSize: uint64(newMinRingSize), MaxRingSize: uint64(newMaxRingSize)},
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}

var ring2 *ring
select {
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for a picker update after a configuration update")
case p2 := <-cc.NewPickerCh:
ring2 = p2.(*picker).ring
}
if ringSize := len(ring2.items); ringSize < newMinRingSize || ringSize > newMaxRingSize {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, newMinRingSize, newMaxRingSize)
}
}

func (s) TestOneSubConn(t *testing.T) {
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]}
cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1})
Expand Down Expand Up @@ -320,7 +354,7 @@ func (s) TestAddrWeightChange(t *testing.T) {

if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: wantAddrs},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
Expand All @@ -336,7 +370,7 @@ func (s) TestAddrWeightChange(t *testing.T) {
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
}},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
Expand All @@ -359,7 +393,7 @@ func (s) TestAddrWeightChange(t *testing.T) {
resolver.Address{Addr: testBackendAddrStrs[1]},
weightedroundrobin.AddrInfo{Weight: 2}),
}},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
Expand Down Expand Up @@ -505,7 +539,7 @@ func (s) TestAddrBalancerAttributesChange(t *testing.T) {
addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs2},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
Expand Down

0 comments on commit f9409d3

Please sign in to comment.