Skip to content

Commit

Permalink
catchup: Dynamic parallel catchup (#5802)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Nov 9, 2023
1 parent 8e30dd4 commit 22b7096
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ import (
const catchupPeersForSync = 10
const blockQueryPeerLimit = 10

// uncapParallelDownloadRate is a simple threshold to detect whether or not the node is caught up.
// If a block is downloaded in less than this duration, it's assumed that the node is not caught up
// and allow the block downloader to start N=parallelBlocks concurrent fetches.
const uncapParallelDownloadRate = time.Second

// this should be at least the number of relays
const catchupRetryLimit = 500

Expand Down Expand Up @@ -85,6 +90,7 @@ type Service struct {
auth BlockAuthenticator
parallelBlocks uint64
deadlineTimeout time.Duration
prevBlockFetchTime time.Time
blockValidationPool execpool.BacklogPool

// suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the
Expand Down Expand Up @@ -448,9 +454,16 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo

// TODO the following code does not handle the following case: seedLookback upgrades during fetch
func (s *Service) pipelinedFetch(seedLookback uint64) {
parallelRequests := s.parallelBlocks
if parallelRequests < seedLookback {
parallelRequests = seedLookback
maxParallelRequests := s.parallelBlocks
if maxParallelRequests < seedLookback {
maxParallelRequests = seedLookback
}
minParallelRequests := seedLookback

// Start the limited requests at max(1, 'seedLookback')
limitedParallelRequests := uint64(1)
if limitedParallelRequests < seedLookback {
limitedParallelRequests = seedLookback
}

completed := make(map[basics.Round]chan bool)
Expand Down Expand Up @@ -480,7 +493,8 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
nextRound := firstRound

for {
for nextRound < firstRound+basics.Round(parallelRequests) {
// launch N=parallelRequests block download go routines.
for nextRound < firstRound+basics.Round(limitedParallelRequests) {
if s.roundIsNotSupported(nextRound) {
// Break out of the loop to avoid fetching
// blocks that we don't support. If there
Expand All @@ -504,6 +518,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
nextRound++
}

// wait for the first round to complete before starting the next download.
select {
case completedOK := <-completed[firstRound]:
delete(completed, firstRound)
Expand All @@ -514,6 +529,15 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
return
}

fetchTime := time.Now()
fetchDur := fetchTime.Sub(s.prevBlockFetchTime)
s.prevBlockFetchTime = fetchTime
if fetchDur < uncapParallelDownloadRate {
limitedParallelRequests = maxParallelRequests
} else {
limitedParallelRequests = minParallelRequests
}

// if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks.
start := time.Now()
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout {
Expand Down

0 comments on commit 22b7096

Please sign in to comment.