Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: comet peering improvements #119

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1eafe2d
initial push
czarcas7ic Jun 28, 2024
e1bc6f1
add reserve pool logic
czarcas7ic Jun 28, 2024
90841fd
fix panic
czarcas7ic Jun 28, 2024
730c54a
inc the reserve size
czarcas7ic Jun 28, 2024
a9245da
add logging
czarcas7ic Jun 29, 2024
991cc95
use mutuex for vars
czarcas7ic Jun 29, 2024
3bf281e
wg for log
czarcas7ic Jun 29, 2024
591a18f
more logs
czarcas7ic Jun 29, 2024
d725468
clarify some lines
czarcas7ic Jun 29, 2024
7e2dae1
more prints for debug
czarcas7ic Jun 29, 2024
dd55bfc
clean up for check
czarcas7ic Jun 29, 2024
acae143
attempt to get correct log
czarcas7ic Jun 29, 2024
c512728
remove the go routine just for fun
czarcas7ic Jun 29, 2024
2034c1f
attempt loop fix
czarcas7ic Jun 29, 2024
b0dc06a
add back concurrency
czarcas7ic Jun 29, 2024
4348ab0
mutex for logs
czarcas7ic Jun 29, 2024
4c76ae5
prints
czarcas7ic Jun 29, 2024
79e3ba5
remove inner go routine
czarcas7ic Jun 29, 2024
1494c8e
fix race condition
czarcas7ic Jun 29, 2024
013e701
mutex
czarcas7ic Jun 29, 2024
e9881b7
test without concurrency again
czarcas7ic Jun 29, 2024
845524b
remove break
czarcas7ic Jun 29, 2024
e3aae12
place break in correct location
czarcas7ic Jun 29, 2024
616a5b0
try concurrency one more time
czarcas7ic Jun 29, 2024
3a474a4
populate dial attempts (might revert)
czarcas7ic Jun 29, 2024
eadf0a0
store dialString (might revert)
czarcas7ic Jun 29, 2024
a77c49d
Revert "store dialString (might revert)"
czarcas7ic Jun 29, 2024
12eb426
Revert "populate dial attempts (might revert)"
czarcas7ic Jun 29, 2024
f0f8477
clean up
czarcas7ic Jun 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libs/rand/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,9 @@ func cRandBytes(numBytes int) []byte {
}
return b
}

func (r *Rand) Shuffle(n int, swap func(i, j int)) {
r.Lock()
defer r.Unlock()
mrand.Shuffle(n, swap)
}
23 changes: 17 additions & 6 deletions p2p/pex/addrbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type AddrBook interface {
Empty() bool

// Pick an address to dial
PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress
PickAddress(biasTowardsNewAddrs int, filter func(*knownAddress) bool) *p2p.NetAddress

// Mark address
MarkGood(p2p.ID)
Expand Down Expand Up @@ -269,7 +269,7 @@ func (a *addrBook) Empty() bool {
// and determines how biased we are to pick an address from a new bucket.
// PickAddress returns nil if the AddrBook is empty or if we try to pick
// from an empty bucket.
func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress {
func (a *addrBook) PickAddress(biasTowardsNewAddrs int, filter func(*knownAddress) bool) *p2p.NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()

Expand Down Expand Up @@ -306,14 +306,25 @@ func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress {
bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
}
}
// pick a random index and loop over the map to return that index
randIndex := a.rand.Intn(len(bucket))

// Create a slice of known addresses from the bucket
addresses := make([]*knownAddress, 0, len(bucket))
for _, ka := range bucket {
if randIndex == 0 {
addresses = append(addresses, ka)
}

// Shuffle the addresses
a.rand.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})

// Iterate through the shuffled addresses and apply the filter until we find a suitable address
for _, ka := range addresses {
if filter == nil || filter(ka) {
return ka.Addr
}
randIndex--
}

return nil
}

Expand Down
14 changes: 7 additions & 7 deletions p2p/pex/addrbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestAddrBookPickAddress(t *testing.T) {
book.SetLogger(log.TestingLogger())
assert.Zero(t, book.Size())

addr := book.PickAddress(50)
addr := book.PickAddress(50, nil)
assert.Nil(t, addr, "expected no address")

randAddrs := randNetAddressPairs(t, 1)
Expand All @@ -38,22 +38,22 @@ func TestAddrBookPickAddress(t *testing.T) {
require.NoError(t, err)

// pick an address when we only have new address
addr = book.PickAddress(0)
addr = book.PickAddress(0, nil)
assert.NotNil(t, addr, "expected an address")
addr = book.PickAddress(50)
addr = book.PickAddress(50, nil)
assert.NotNil(t, addr, "expected an address")
addr = book.PickAddress(100)
addr = book.PickAddress(100, nil)
assert.NotNil(t, addr, "expected an address")

// pick an address when we only have old address
book.MarkGood(addrSrc.addr.ID)
addr = book.PickAddress(0)
addr = book.PickAddress(0, nil)
assert.NotNil(t, addr, "expected an address")
addr = book.PickAddress(50)
addr = book.PickAddress(50, nil)
assert.NotNil(t, addr, "expected an address")

// in this case, nNew==0 but we biased 100% to new, so we return nil
addr = book.PickAddress(100)
addr = book.PickAddress(100, nil)
assert.Nil(t, addr, "did not expected an address")
}

Expand Down
148 changes: 118 additions & 30 deletions p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,41 +473,122 @@ func (r *Reactor) ensurePeers() {
newBias := cmtmath.MinInt(out, 8)*10 + 10

toDial := make(map[p2p.ID]*p2p.NetAddress)
reserve := make(map[p2p.ID]*p2p.NetAddress)
// Try maxAttempts times to pick numToDial addresses to dial
maxAttempts := numToDial * 3
maxToDialAttempts := numToDial * 3

for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
try := r.book.PickAddress(newBias)
if try == nil {
continue
// Calculate reserve size as 50% of numToDial with a minimum of 10
reserveSize := cmtmath.MaxInt(numToDial/2, 10)
maxReserveAttempts := reserveSize * 3

filter := func(ka *knownAddress) bool {
attempts, lastDialedTime := r.dialAttemptsInfo(ka.Addr)
if r.IsTooEarlyToDial(ka.Addr, attempts, lastDialedTime) {
return false
}
if r.IsMaxAttemptsToDial(ka.Addr, attempts) {
r.book.MarkBad(ka.Addr, defaultBanTime)
return false
}
if r.Switch.IsDialingOrExistingAddress(ka.Addr) {
return false
}
if _, selected := toDial[ka.Addr.ID]; selected {
return false
}
if _, selected := reserve[ka.Addr.ID]; selected {
return false
}
if _, selected := toDial[try.ID]; selected {
return true
}

var (
successCount int
errorCount int
)

for i := 0; i < maxToDialAttempts && len(toDial) < numToDial; i++ {
prospectivePeer := r.book.PickAddress(newBias, filter)
if prospectivePeer == nil {
continue
}
if r.Switch.IsDialingOrExistingAddress(try) {
toDial[prospectivePeer.ID] = prospectivePeer
}

// Add extra peers to the reserve
for i := 0; i < maxReserveAttempts && len(reserve) < reserveSize; i++ {
prospectivePeer := r.book.PickAddress(newBias, filter)
if prospectivePeer == nil {
continue
}
// TODO: consider moving some checks from toDial into here
// so we don't even consider dialing peers that we want to wait
// before dialing again, or have dialed too many times already
toDial[try.ID] = try
reserve[prospectivePeer.ID] = prospectivePeer
}

toDialCount := len(toDial)
reserveCount := len(reserve)

var wg sync.WaitGroup
var mu sync.Mutex

// Dial picked addresses
for _, addr := range toDial {
wg.Add(1)
go func(addr *p2p.NetAddress) {
defer wg.Done()
err := r.dialPeer(addr)
if err != nil {
mu.Lock()
errorCount++
mu.Unlock()
switch err.(type) {
case errMaxAttemptsToDial, errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Debug(err.Error(), "addr", addr)
}
// Attempt to dial reserve peers if there was an error
mu.Lock()
for id, reserveAddr := range reserve {
if reserveAddr != nil {
delete(reserve, id) // Remove from reserve
mu.Unlock() // Unlock before dialing
err := r.dialPeer(reserveAddr)
mu.Lock() // Re-lock after dialing
if err != nil {
errorCount++
switch err.(type) {
case errMaxAttemptsToDial, errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", reserveAddr)
default:
r.Logger.Debug(err.Error(), "addr", reserveAddr)
}
} else {
successCount++
break
}
}
}
mu.Unlock()
} else {
mu.Lock()
successCount++
mu.Unlock()
}
}(addr)
}

// Log the summary in a separate goroutine
go func() {
wg.Wait()
r.Logger.Info(
"Dialing summary",
"toDialCount", toDialCount,
"reserveCount", reserveCount,
"successCount", successCount,
"errorCount", errorCount,
)
}()

if r.book.NeedMoreAddrs() {
// Check if banned nodes can be reinstated
r.book.ReinstateBadPeers()
Expand Down Expand Up @@ -537,32 +618,16 @@ func (r *Reactor) ensurePeers() {
func (r *Reactor) dialAttemptsInfo(addr *p2p.NetAddress) (attempts int, lastDialed time.Time) {
_attempts, ok := r.attemptsToDial.Load(addr.DialString())
if !ok {
return
return 0, time.Time{}
}
atd := _attempts.(_attemptsToDial)
return atd.number, atd.lastDialed
}

func (r *Reactor) dialPeer(addr *p2p.NetAddress) error {
attempts, lastDialed := r.dialAttemptsInfo(addr)
if !r.Switch.IsPeerPersistent(addr) && attempts > maxAttemptsToDial {
r.book.MarkBad(addr, defaultBanTime)
return errMaxAttemptsToDial{}
}

// exponential backoff if it's not our first attempt to dial given address
if attempts > 0 {
jitter := time.Duration(cmtrand.Float64() * float64(time.Second)) // 1s == (1e9 ns)
backoffDuration := jitter + ((1 << uint(attempts)) * time.Second)
backoffDuration = r.maxBackoffDurationForPeer(addr, backoffDuration)
sinceLastDialed := time.Since(lastDialed)
if sinceLastDialed < backoffDuration {
return errTooEarlyToDial{backoffDuration, lastDialed}
}
}

err := r.Switch.DialPeerWithAddress(addr)
if err != nil {
attempts, _ := r.dialAttemptsInfo(addr)
if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok {
return err
}
Expand All @@ -573,7 +638,8 @@ func (r *Reactor) dialPeer(addr *p2p.NetAddress) error {
// NOTE: addr is removed from addrbook in markAddrInBookBasedOnErr
r.attemptsToDial.Delete(addr.DialString())
default:
r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()})
newAttempts := attempts + 1
r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{newAttempts, time.Now()})
}
return fmt.Errorf("dialing failed (attempts: %d): %w", attempts+1, err)
}
Expand Down Expand Up @@ -756,10 +822,32 @@ func (r *Reactor) attemptDisconnects() {

func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
// TODO: detect more "bad peer" scenarios
// TODO, blacklist peer if fmt.Errorf("peer is on a different network. Got %v, expected %v", other.Network, info.Network)
switch err.(type) {
case p2p.ErrSwitchAuthenticationFailure:
book.MarkBad(addr, defaultBanTime)
default:
book.MarkAttempt(addr)
}
}

func (r *Reactor) IsTooEarlyToDial(addr *p2p.NetAddress, attempts int, lastDialedTime time.Time) bool {
if attempts > 0 {
jitter := time.Duration(cmtrand.Float64() * float64(time.Second)) // 1s == (1e9 ns)
backoffDuration := jitter + ((1 << uint(attempts)) * time.Second)
backoffDuration = r.maxBackoffDurationForPeer(addr, backoffDuration)
sinceLastDialed := time.Since(lastDialedTime)
if sinceLastDialed < backoffDuration {
r.Logger.Debug("Skipping peer due to backoff", "addr", addr, "backoffDuration", backoffDuration)
return true
}
}
return false
}

func (r *Reactor) IsMaxAttemptsToDial(addr *p2p.NetAddress, attempts int) bool {
if !r.Switch.IsPeerPersistent(addr) && attempts > maxAttemptsToDial {
return true
}
return false
}
2 changes: 1 addition & 1 deletion test/fuzz/p2p/addrbook/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func Fuzz(data []byte) int {

// Also, make sure PickAddress always returns a non-nil address.
bias := rand.Intn(100) //nolint:gosec
if p := addrBook.PickAddress(bias); p == nil {
if p := addrBook.PickAddress(bias, nil); p == nil {
panic(fmt.Sprintf("picked a nil address (bias: %d, addrBook size: %v)",
bias, addrBook.Size()))
}
Expand Down
Loading