From a6a1a8880db6f596c18c7e754e48eb6c28fc2770 Mon Sep 17 00:00:00 2001 From: nisdas Date: Thu, 28 Mar 2024 14:45:08 +0800 Subject: [PATCH 1/8] maximize it --- .../sync/initial-sync/blocks_fetcher.go | 45 ++++++++++++++++--- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 0390a92438c4..f4e133c3c3f6 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -336,6 +336,10 @@ func (f *blocksFetcher) fetchBlocksFromPeer( Count: count, Step: 1, } + bestPeers := f.hasSufficientBandwith(peers, req.Count) + // We append the best peers to the front so that higher capacity + // peers are dialed first. + peers = append(bestPeers, peers...) for i := 0; i < len(peers); i++ { p := peers[i] blocks, err := f.requestBlocks(ctx, req, p) @@ -472,7 +476,7 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e } // fetchBlobsFromPeer fetches blocks from a single randomly selected peer. -func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID) ([]blocks2.BlockWithROBlobs, error) { +func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID, peers []peer.ID) ([]blocks2.BlockWithROBlobs, error) { ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer") defer span.End() if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch { @@ -487,13 +491,28 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl if req == nil { return bwb, nil } - // Request blobs from the same peer that gave us the blob batch. - blobs, err := f.requestBlobs(ctx, req, pid) - if err != nil { - return nil, errors.Wrap(err, "could not request blobs by range") + // We dial the initial peer first to ensure that we get the desired set of blobs. + wantedPeers := append([]peer.ID{pid}, peers...) + bestPeers := f.hasSufficientBandwith(wantedPeers, req.Count) + // We append the best peers to the front so that higher capacity + // peers are dialed first. + peers = append(bestPeers, peers...) + for i := 0; i < len(peers); i++ { + p := peers[i] + blobs, err := f.requestBlobs(ctx, req, p) + if err != nil { + log.WithField("peer", p).WithError(err).Debug("Could not request blocks by range from peer") + continue + } + f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p) + robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart) + if err != nil { + log.WithField("peer", p).WithError(err).Debug("invalid BeaconBlocksByRange response") + continue + } + return robs, err } - f.p2p.Peers().Scorers().BlockProviderScorer().Touch(pid) - return verifyAndPopulateBlobs(bwb, blobs, blobWindowStart) + return nil, errNoPeersAvailable } // requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams. @@ -606,6 +625,18 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error { return nil } +func (f *blocksFetcher) hasSufficientBandwith(peers []peer.ID, count uint64) []peer.ID { + filteredPeers := []peer.ID{} + for _, p := range peers { + if f.rateLimiter.Remaining(p.String()) < int64(count) { + continue + } + copiedP := p + filteredPeers = append(filteredPeers, copiedP) + } + return filteredPeers +} + // Determine how long it will take for us to have the required number of blocks allowed by our rate limiter. // We do this by calculating the duration till the rate limiter can request these blocks without exceeding // the provided bandwidth limits per peer. From 505b2122e429b3507063ca328030fc07404c634c Mon Sep 17 00:00:00 2001 From: nisdas Date: Thu, 28 Mar 2024 15:06:56 +0800 Subject: [PATCH 2/8] fix it --- beacon-chain/sync/initial-sync/blocks_fetcher.go | 3 ++- beacon-chain/sync/initial-sync/blocks_fetcher_utils.go | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index f4e133c3c3f6..520531a7c082 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -312,7 +312,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers) if response.err == nil { - bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid) + bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers) if err != nil { response.err = err } @@ -491,6 +491,7 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl if req == nil { return bwb, nil } + peers = f.filterPeers(ctx, peers, peersPercentagePerRequest) // We dial the initial peer first to ensure that we get the desired set of blobs. wantedPeers := append([]peer.ID{pid}, peers...) bestPeers := f.hasSufficientBandwith(wantedPeers, req.Count) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 5610a7ed0adc..dcdaf161ab94 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -280,7 +280,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot } // We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import // the blocks. - bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid) + bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid}) if err != nil { return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer") } @@ -302,7 +302,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa if err != nil { return nil, errors.Wrap(err, "received invalid blocks in findAncestor") } - bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid) + bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid}) if err != nil { return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor") } From 6b5e9f3f9dbc06429e21e441819e84841e64a7ca Mon Sep 17 00:00:00 2001 From: nisdas Date: Thu, 28 Mar 2024 15:07:56 +0800 Subject: [PATCH 3/8] lint --- beacon-chain/sync/initial-sync/blocks_fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 520531a7c082..162d95c55637 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -629,7 +629,7 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error { func (f *blocksFetcher) hasSufficientBandwith(peers []peer.ID, count uint64) []peer.ID { filteredPeers := []peer.ID{} for _, p := range peers { - if f.rateLimiter.Remaining(p.String()) < int64(count) { + if uint64(f.rateLimiter.Remaining(p.String())) < count { continue } copiedP := p From 268d3038a3fdf1fac1ac91a324027658909b0bb1 Mon Sep 17 00:00:00 2001 From: nisdas Date: Thu, 28 Mar 2024 16:24:26 +0800 Subject: [PATCH 4/8] add test --- .../sync/initial-sync/blocks_fetcher_test.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 43e062127b5c..8936d8272ac4 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -12,6 +12,7 @@ import ( libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" p2pm "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" @@ -1166,3 +1167,22 @@ func TestBatchLimit(t *testing.T) { assert.Equal(t, params.BeaconConfig().MaxRequestBlocksDeneb, uint64(maxBatchLimit())) } + +func TestBlockFetcher_HasSufficientBandwidth(t *testing.T) { + bf := newBlocksFetcher(context.Background(), &blocksFetcherConfig{}) + currCap := bf.rateLimiter.Capacity() + wantedAmt := currCap - 100 + bf.rateLimiter.Add(peer.ID("a").String(), wantedAmt) + bf.rateLimiter.Add(peer.ID("c").String(), wantedAmt) + bf.rateLimiter.Add(peer.ID("f").String(), wantedAmt) + bf.rateLimiter.Add(peer.ID("d").String(), wantedAmt) + + receivedPeers := bf.hasSufficientBandwith([]peer.ID{"a", "b", "c", "d", "e", "f"}, 110) + for _, p := range receivedPeers { + switch p { + case "a", "c", "f", "d": + t.Errorf("peer has exceeded capacity: %s", p) + } + } + assert.Equal(t, 2, len(receivedPeers)) +} From c5d3d540fe60c74fb2ff51c5753b0b8ccaa3a438 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Thu, 28 Mar 2024 16:24:47 +0800 Subject: [PATCH 5/8] Update beacon-chain/sync/initial-sync/blocks_fetcher.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Radosław Kapka --- beacon-chain/sync/initial-sync/blocks_fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 162d95c55637..add1a099f6c7 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -508,7 +508,7 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p) robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart) if err != nil { - log.WithField("peer", p).WithError(err).Debug("invalid BeaconBlocksByRange response") + log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlocksByRange response") continue } return robs, err From 5a40e3f9a60bc6f5dd03b80379b6e64f2c9446f3 Mon Sep 17 00:00:00 2001 From: nisdas Date: Thu, 28 Mar 2024 16:34:04 +0800 Subject: [PATCH 6/8] logs --- beacon-chain/sync/initial-sync/blocks_fetcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index add1a099f6c7..abb1f2a8deed 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -502,13 +502,13 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl p := peers[i] blobs, err := f.requestBlobs(ctx, req, p) if err != nil { - log.WithField("peer", p).WithError(err).Debug("Could not request blocks by range from peer") + log.WithField("peer", p).WithError(err).Debug("Could not request blobs by range from peer") continue } f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p) robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart) if err != nil { - log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlocksByRange response") + log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlobsByRange response") continue } return robs, err From 038cfc51f14dea6b9d1aaa3f038bdfbfbd5fbc5e Mon Sep 17 00:00:00 2001 From: nisdas Date: Fri, 29 Mar 2024 13:21:13 +0800 Subject: [PATCH 7/8] kasey's review --- beacon-chain/sync/initial-sync/blocks_fetcher.go | 11 ++++++----- beacon-chain/sync/initial-sync/blocks_fetcher_test.go | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index abb1f2a8deed..18ea07a81062 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -336,7 +336,7 @@ func (f *blocksFetcher) fetchBlocksFromPeer( Count: count, Step: 1, } - bestPeers := f.hasSufficientBandwith(peers, req.Count) + bestPeers := f.hasSufficientBandwidth(peers, req.Count) // We append the best peers to the front so that higher capacity // peers are dialed first. peers = append(bestPeers, peers...) @@ -494,10 +494,11 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl peers = f.filterPeers(ctx, peers, peersPercentagePerRequest) // We dial the initial peer first to ensure that we get the desired set of blobs. wantedPeers := append([]peer.ID{pid}, peers...) - bestPeers := f.hasSufficientBandwith(wantedPeers, req.Count) + bestPeers := f.hasSufficientBandwidth(wantedPeers, req.Count) // We append the best peers to the front so that higher capacity - // peers are dialed first. - peers = append(bestPeers, peers...) + // peers are dialed first. If all of them fail, we fallback to the + // initial peer we wanted to request blobs from. + peers = append(bestPeers, pid) for i := 0; i < len(peers); i++ { p := peers[i] blobs, err := f.requestBlobs(ctx, req, p) @@ -626,7 +627,7 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error { return nil } -func (f *blocksFetcher) hasSufficientBandwith(peers []peer.ID, count uint64) []peer.ID { +func (f *blocksFetcher) hasSufficientBandwidth(peers []peer.ID, count uint64) []peer.ID { filteredPeers := []peer.ID{} for _, p := range peers { if uint64(f.rateLimiter.Remaining(p.String())) < count { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 8936d8272ac4..0852d8f7f5c5 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -1177,7 +1177,7 @@ func TestBlockFetcher_HasSufficientBandwidth(t *testing.T) { bf.rateLimiter.Add(peer.ID("f").String(), wantedAmt) bf.rateLimiter.Add(peer.ID("d").String(), wantedAmt) - receivedPeers := bf.hasSufficientBandwith([]peer.ID{"a", "b", "c", "d", "e", "f"}, 110) + receivedPeers := bf.hasSufficientBandwidth([]peer.ID{"a", "b", "c", "d", "e", "f"}, 110) for _, p := range receivedPeers { switch p { case "a", "c", "f", "d": From dcd76125276ed27cf9479bd95a4a18ffe3f75b89 Mon Sep 17 00:00:00 2001 From: nisdas Date: Sat, 30 Mar 2024 22:04:31 +0800 Subject: [PATCH 8/8] kasey's review --- beacon-chain/sync/initial-sync/blocks_fetcher.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 18ea07a81062..429fa833ec3d 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -340,6 +340,7 @@ func (f *blocksFetcher) fetchBlocksFromPeer( // We append the best peers to the front so that higher capacity // peers are dialed first. peers = append(bestPeers, peers...) + peers = dedupPeers(peers) for i := 0; i < len(peers); i++ { p := peers[i] blocks, err := f.requestBlocks(ctx, req, p) @@ -659,3 +660,18 @@ func timeToWait(wanted, rem, capacity int64, timeTillEmpty time.Duration) time.D expectedTime := int64(timeTillEmpty) * blocksNeeded / currentNumBlks return time.Duration(expectedTime) } + +// deduplicates the provided peer list. +func dedupPeers(peers []peer.ID) []peer.ID { + newPeerList := make([]peer.ID, 0, len(peers)) + peerExists := make(map[peer.ID]bool) + + for i := range peers { + if peerExists[peers[i]] { + continue + } + newPeerList = append(newPeerList, peers[i]) + peerExists[peers[i]] = true + } + return newPeerList +}