Skip to content

Commit

Permalink
pickfirst: Implement Happy Eyeballs (#7725)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Nov 12, 2024
1 parent 60c70a4 commit e2b98f9
Show file tree
Hide file tree
Showing 4 changed files with 369 additions and 85 deletions.
17 changes: 14 additions & 3 deletions balancer/pickfirst/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,18 @@
// Package internal contains code internal to the pickfirst package.
package internal

import "math/rand"
import (
rand "math/rand/v2"
"time"
)

// RandShuffle pseudo-randomizes the order of addresses.
var RandShuffle = rand.Shuffle
var (
// RandShuffle pseudo-randomizes the order of addresses.
RandShuffle = rand.Shuffle
// TimeAfterFunc allows mocking the timer for testing connection delay
// related functionality.
TimeAfterFunc = func(d time.Duration, f func()) func() {
timer := time.AfterFunc(d, f)
return func() { timer.Stop() }
}
)
181 changes: 127 additions & 54 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"fmt"
"net"
"sync"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/internal"
Expand Down Expand Up @@ -59,8 +60,13 @@ var (
Name = "pick_first_leaf"
)

// TODO: change to pick-first when this becomes the default pick_first policy.
const logPrefix = "[pick-first-leaf-lb %p] "
const (
// TODO: change to pick-first when this becomes the default pick_first policy.
logPrefix = "[pick-first-leaf-lb %p] "
// connectionDelayInterval is the time to wait for during the happy eyeballs
// pass before starting the next connection attempt.
connectionDelayInterval = 250 * time.Millisecond
)

type ipAddrFamily int

Expand All @@ -76,11 +82,12 @@ type pickfirstBuilder struct{}

func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cancelConnectionTimer: func() {},
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
Expand Down Expand Up @@ -115,8 +122,9 @@ type scData struct {
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
lastErr error
state connectivity.State
lastErr error
connectionFailedInFirstPass bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
Expand Down Expand Up @@ -148,10 +156,11 @@ type pickfirstBalancer struct {
mu sync.Mutex
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
cancelConnectionTimer func()
}

// ResolverError is called by the ClientConn when the name resolver produces
Expand Down Expand Up @@ -186,6 +195,7 @@ func (b *pickfirstBalancer) resolverErrorLocked(err error) {
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
b.mu.Lock()
defer b.mu.Unlock()
b.cancelConnectionTimer()
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
Expand Down Expand Up @@ -239,12 +249,8 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// Not de-duplicating would result in attempting to connect to the same
// SubConn multiple times in the same pass. We don't want this.
newAddrs = deDupAddresses(newAddrs)

newAddrs = interleaveAddresses(newAddrs)

// Since we have a new set of addresses, we are again at first pass.
b.firstPass = true

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
prevAddr := b.addressList.currentAddress()
Expand All @@ -269,11 +275,11 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.requestConnectionLocked()
b.startFirstPassLocked()
} else if b.state == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.requestConnectionLocked()
b.startFirstPassLocked()
}
return nil
}
Expand All @@ -288,6 +294,7 @@ func (b *pickfirstBalancer) Close() {
b.mu.Lock()
defer b.mu.Unlock()
b.closeSubConnsLocked()
b.cancelConnectionTimer()
b.state = connectivity.Shutdown
}

Expand All @@ -297,12 +304,21 @@ func (b *pickfirstBalancer) Close() {
func (b *pickfirstBalancer) ExitIdle() {
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle && b.addressList.currentAddress() == b.addressList.first() {
b.firstPass = true
b.requestConnectionLocked()
if b.state == connectivity.Idle {
b.startFirstPassLocked()
}
}

func (b *pickfirstBalancer) startFirstPassLocked() {
b.firstPass = true
b.numTF = 0
// Reset the connection attempt record for existing SubConns.
for _, sd := range b.subConns.Values() {
sd.(*scData).connectionFailedInFirstPass = false
}
b.requestConnectionLocked()
}

func (b *pickfirstBalancer) closeSubConnsLocked() {
for _, sd := range b.subConns.Values() {
sd.(*scData).subConn.Shutdown()
Expand Down Expand Up @@ -413,6 +429,7 @@ func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address)
// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
// becomes ready, which means that all other subConn must be shutdown.
func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
b.cancelConnectionTimer()
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.subConn != selected.subConn {
Expand Down Expand Up @@ -456,30 +473,69 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
switch scd.state {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
return
case connectivity.TransientFailure:
// Try the next address.
// The SubConn is being re-used and failed during a previous pass
// over the addressList. It has not completed backoff yet.
// Mark it as having failed and try the next address.
scd.connectionFailedInFirstPass = true
lastErr = scd.lastErr
continue
case connectivity.Ready:
// Should never happen.
b.logger.Errorf("Requesting a connection even though we have a READY SubConn")
case connectivity.Shutdown:
// Should never happen.
b.logger.Errorf("SubConn with state SHUTDOWN present in SubConns map")
case connectivity.Connecting:
// Wait for the SubConn to report success or failure.
// Wait for the connection attempt to complete or the timer to fire
// before attempting the next address.
b.scheduleNextConnectionLocked()
return
default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.state)
return

}
return
}

// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
// first pass.
b.endFirstPassLocked(lastErr)
// first pass if possible.
b.endFirstPassIfPossibleLocked(lastErr)
}

func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
b.cancelConnectionTimer()
if !b.addressList.hasNext() {
return
}
curAddr := b.addressList.currentAddress()
cancelled := false // Access to this is protected by the balancer's mutex.
closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() {
b.mu.Lock()
defer b.mu.Unlock()
// If the scheduled task is cancelled while acquiring the mutex, return.
if cancelled {
return
}
if b.logger.V(2) {
b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr)
}
if b.addressList.increment() {
b.requestConnectionLocked()
}
})
// Access to the cancellation callback held by the balancer is guarded by
// the balancer's mutex, so it's safe to set the boolean from the callback.
b.cancelConnectionTimer = sync.OnceFunc(func() {
cancelled = true
closeFn()
})
}

func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
// Record a connection attempt when exiting CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionFailedInFirstPass = true
}
sd.state = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
Expand Down Expand Up @@ -545,17 +601,20 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
sd.lastErr = newState.ConnectionError
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
// a pass over the previous address list. We ignore such updates.

if curAddr := b.addressList.currentAddress(); !equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
return
}
if b.addressList.increment() {
b.requestConnectionLocked()
return
// a pass over the previous address list. Happy Eyeballs will also
// cause out of order updates to arrive.

if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
b.cancelConnectionTimer()
if b.addressList.increment() {
b.requestConnectionLocked()
return
}
}
// End of the first pass.
b.endFirstPassLocked(newState.ConnectionError)

// End the first pass if we've seen a TRANSIENT_FAILURE from all
// SubConns once.
b.endFirstPassIfPossibleLocked(newState.ConnectionError)
}
return
}
Expand All @@ -580,9 +639,22 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
}
}

func (b *pickfirstBalancer) endFirstPassLocked(lastErr error) {
// endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the
// addresses are tried and their SubConns have reported a failure.
func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
// An optimization to avoid iterating over the entire SubConn map.
if b.addressList.isValid() {
return
}
// Connect() has been called on all the SubConns. The first pass can be
// ended if all the SubConns have reported a failure.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if !sd.connectionFailedInFirstPass {
return
}
}
b.firstPass = false
b.numTF = 0
b.state = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
Expand Down Expand Up @@ -654,15 +726,6 @@ func (al *addressList) currentAddress() resolver.Address {
return al.addresses[al.idx]
}

// first returns the first address in the list. If the list is empty, it returns
// an empty address instead.
func (al *addressList) first() resolver.Address {
if len(al.addresses) == 0 {
return resolver.Address{}
}
return al.addresses[0]
}

func (al *addressList) reset() {
al.idx = 0
}
Expand All @@ -685,6 +748,16 @@ func (al *addressList) seekTo(needle resolver.Address) bool {
return false
}

// hasNext returns whether incrementing the addressList will result in moving
// past the end of the list. If the list has already moved past the end, it
// returns false.
func (al *addressList) hasNext() bool {
if !al.isValid() {
return false
}
return al.idx+1 < len(al.addresses)
}

// equalAddressIgnoringBalAttributes returns true is a and b are considered
// equal. This is different from the Equal method on the resolver.Address type
// which considers all fields to determine equality. Here, we only consider
Expand Down
Loading

0 comments on commit e2b98f9

Please sign in to comment.