Skip to content

Commit

Permalink
Merge branch 'master' into will/dynamic-parallel-catchup
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Oct 28, 2023
2 parents cc632b3 + f5d901a commit 2e6b628
Show file tree
Hide file tree
Showing 92 changed files with 2,244 additions and 1,673 deletions.
31 changes: 17 additions & 14 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
Expand Down Expand Up @@ -370,6 +371,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error

attemptsCount := 0
var blk *bookkeeping.Block
var cert *agreement.Certificate
// check to see if the current ledger might have this block. If so, we should try this first instead of downloading anything.
if ledgerBlock, err := cs.ledger.Block(blockRound); err == nil {
blk = &ledgerBlock
Expand All @@ -384,7 +386,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(blockRound, uint64(attemptsCount))
if stop {
return err
} else if blk == nil {
Expand Down Expand Up @@ -462,7 +464,7 @@ func (cs *CatchpointCatchupService) processStageLatestBlockDownload() (err error
return cs.abort(fmt.Errorf("processStageLatestBlockDownload failed when calling StoreBalancesRound : %v", err))
}

err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk)
err = cs.ledgerAccessor.StoreFirstBlock(cs.ctx, blk, cert)
if err != nil {
if attemptsCount <= cs.config.CatchupBlockDownloadRetryAttempts {
// try again.
Expand Down Expand Up @@ -542,6 +544,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
prevBlock := &topBlock
blocksFetched := uint64(1) // we already got the first block in the previous step.
var blk *bookkeeping.Block
var cert *agreement.Certificate
for retryCount := uint64(1); blocksFetched <= lookback; {
if err := cs.ctx.Err(); err != nil {
return cs.stopOrAbort()
Expand All @@ -564,7 +567,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
blockDownloadDuration := time.Duration(0)
if blk == nil {
var stop bool
blk, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
blk, cert, blockDownloadDuration, psp, stop, err = cs.fetchBlock(topBlock.Round()-basics.Round(blocksFetched), retryCount)
if stop {
return err
} else if blk == nil {
Expand Down Expand Up @@ -624,7 +627,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}

// all good, persist and move on.
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk)
err = cs.ledgerAccessor.StoreBlock(cs.ctx, blk, cert)
if err != nil {
cs.log.Warnf("processStageBlocksDownload failed to store downloaded staging block for round %d", blk.Round())
cs.updateBlockRetrievalStatistics(-1, -1)
Expand All @@ -649,17 +652,17 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
// fetchBlock uses the internal peer selector blocksDownloadPeerSelector to pick a peer and then attempt to fetch the block requested from that peer.
// The method return stop=true if the caller should exit the current operation
// If the method return a nil block, the caller is expected to retry the operation, increasing the retry counter as needed.
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
if err == errPeerSelectorNoPeerPoolsAvailable {
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
err = fmt.Errorf("fetchBlock: unable to obtain a list of peers to retrieve the latest block from : %w", err)
return nil, time.Duration(0), psp, true, cs.abort(err)
return nil, nil, time.Duration(0), psp, true, cs.abort(err)
}
peer := psp.Peer

Expand All @@ -669,26 +672,26 @@ func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount ui
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock: recurring non-HTTP peer was provided by the peer selector"))
}
fetcher := makeUniversalBlockFetcher(cs.log, cs.net, cs.config)
blk, _, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
blk, cert, downloadDuration, err = fetcher.fetchBlock(cs.ctx, round, httpPeer)
if err != nil {
if cs.ctx.Err() != nil {
return nil, time.Duration(0), psp, true, cs.stopOrAbort()
return nil, nil, time.Duration(0), psp, true, cs.stopOrAbort()
}
if retryCount <= uint64(cs.config.CatchupBlockDownloadRetryAttempts) {
// try again.
cs.log.Infof("Failed to download block %d on attempt %d out of %d. %v", round, retryCount, cs.config.CatchupBlockDownloadRetryAttempts, err)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)
return nil, time.Duration(0), psp, false, nil
return nil, nil, time.Duration(0), psp, false, nil
}
return nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
return nil, nil, time.Duration(0), psp, true, cs.abort(fmt.Errorf("fetchBlock failed after multiple blocks download attempts"))
}
// success
return blk, downloadDuration, psp, false, nil
return blk, cert, downloadDuration, psp, false, nil
}

// processStageLedgerDownload is the fifth catchpoint catchup stage. It completes the catchup process, swap the new tables and restart the node functionality.
Expand Down
70 changes: 44 additions & 26 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,18 @@ type Ledger interface {
Block(basics.Round) (bookkeeping.Block, error)
BlockHdr(basics.Round) (bookkeeping.BlockHeader, error)
IsWritingCatchpointDataFile() bool
IsBehindCommittingDeltas() bool
Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error)
AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error
WaitMem(r basics.Round) chan struct{}
}

// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network.
type Service struct {
syncStartNS int64 // at top of struct to keep 64 bit aligned for atomic.* ops
// disableSyncRound, provided externally, is the first round we will _not_ fetch from the network
// any round >= disableSyncRound will not be fetched. If set to 0, it will be disregarded.
disableSyncRound uint64
disableSyncRound atomic.Uint64
syncStartNS atomic.Int64
cfg config.Local
ledger Ledger
ctx context.Context
Expand All @@ -92,15 +93,15 @@ type Service struct {
prevBlockFetchTime time.Time
blockValidationPool execpool.BacklogPool

// suspendForCatchpointWriting defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file or flushing accounts. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// and resume from there without stopping the catchup timer.
suspendForCatchpointWriting bool
suspendForLedgerOps bool

// The channel gets closed when the initial sync is complete. This allows for other services to avoid
// the overhead of starting prematurely (before this node is caught-up and can validate messages for example).
InitialSyncDone chan struct{}
initialSyncNotified uint32
initialSyncNotified atomic.Uint32
protocolErrorLogged bool
unmatchedPendingCertificates <-chan PendingUnmatchedCertificate
// This channel signals periodSync to attempt catchup immediately. This allows us to start fetching rounds from
Expand Down Expand Up @@ -146,7 +147,7 @@ func MakeService(log logging.Logger, config config.Local, net network.GossipNode
// Start the catchup service
func (s *Service) Start() {
s.ctx, s.cancel = context.WithCancel(context.Background())
atomic.StoreUint32(&s.initialSyncNotified, 0)
s.initialSyncNotified.Store(0)
s.InitialSyncDone = make(chan struct{})
s.workers.Add(1)
go s.periodicSync()
Expand All @@ -156,7 +157,7 @@ func (s *Service) Start() {
func (s *Service) Stop() {
s.cancel()
s.workers.Wait()
if atomic.CompareAndSwapUint32(&s.initialSyncNotified, 0, 1) {
if s.initialSyncNotified.CompareAndSwap(0, 1) {
close(s.InitialSyncDone)
}
}
Expand All @@ -165,8 +166,8 @@ func (s *Service) Stop() {
// or attempting to catchup after too-long waiting for next block.
// Also returns a 2nd bool indicating if this is our initial sync
func (s *Service) IsSynchronizing() (synchronizing bool, initialSync bool) {
synchronizing = atomic.LoadInt64(&s.syncStartNS) != 0
initialSync = atomic.LoadUint32(&s.initialSyncNotified) == 0
synchronizing = s.syncStartNS.Load() != 0
initialSync = s.initialSyncNotified.Load() == 0
return
}

Expand All @@ -186,25 +187,25 @@ func (s *Service) SetDisableSyncRound(rnd uint64) error {
if basics.Round(rnd) < s.ledger.LastRound() {
return ErrSyncRoundInvalid
}
atomic.StoreUint64(&s.disableSyncRound, rnd)
s.disableSyncRound.Store(rnd)
s.triggerSync()
return nil
}

// UnsetDisableSyncRound removes any previously set disabled sync round
func (s *Service) UnsetDisableSyncRound() {
atomic.StoreUint64(&s.disableSyncRound, 0)
s.disableSyncRound.Store(0)
s.triggerSync()
}

// GetDisableSyncRound returns the disabled sync round
func (s *Service) GetDisableSyncRound() uint64 {
return atomic.LoadUint64(&s.disableSyncRound)
return s.disableSyncRound.Load()
}

// SynchronizingTime returns the time we've been performing a catchup operation (0 if not currently catching up)
func (s *Service) SynchronizingTime() time.Duration {
startNS := atomic.LoadInt64(&s.syncStartNS)
startNS := s.syncStartNS.Load()
if startNS == 0 {
return time.Duration(0)
}
Expand Down Expand Up @@ -518,14 +519,26 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
limitedParallelRequests = minParallelRequests
}

// TODO: Remove this. It's just for debugging.
s.log.Infof("ROUND COMPLETE (%d): parallel requests (%d): active downloads (%d)", firstRound-1, limitedParallelRequests, len(completed))
// if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks.
start := time.Now()
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout {
time.Sleep(100 * time.Millisecond)
}

// if ledger is still busy after s.deadlineTimeout timeout then abort the current pipelinedFetch invocation.

// if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we
// could resume with the catchup.
if s.ledger.IsWritingCatchpointDataFile() {
s.log.Info("Catchup is stopping due to catchpoint file being written")
s.suspendForCatchpointWriting = true
s.suspendForLedgerOps = true
return
}

// if the ledger has too many non-flushed account changes, stop catching up to reduce the memory pressure.
if s.ledger.IsBehindCommittingDeltas() {
s.log.Info("Catchup is stopping due to too many non-flushed account changes")
s.suspendForLedgerOps = true
return
}

Expand Down Expand Up @@ -582,10 +595,10 @@ func (s *Service) periodicSync() {
sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout
continue
case <-s.syncNow:
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() {
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() {
continue
}
s.suspendForCatchpointWriting = false
s.suspendForLedgerOps = false
s.log.Info("Immediate resync triggered; resyncing")
s.sync()
case <-time.After(sleepDuration):
Expand All @@ -602,7 +615,12 @@ func (s *Service) periodicSync() {
// keep the existing sleep duration and try again later.
continue
}
s.suspendForCatchpointWriting = false
// if the ledger has too many non-flushed account changes, skip
if s.ledger.IsBehindCommittingDeltas() {
continue
}

s.suspendForLedgerOps = false
s.log.Info("It's been too long since our ledger advanced; resyncing")
s.sync()
case cert := <-s.unmatchedPendingCertificates:
Expand Down Expand Up @@ -635,8 +653,8 @@ func (s *Service) sync() {
start := time.Now()

timeInNS := start.UnixNano()
if !atomic.CompareAndSwapInt64(&s.syncStartNS, 0, timeInNS) {
s.log.Infof("resuming previous sync from %d (now=%d)", atomic.LoadInt64(&s.syncStartNS), timeInNS)
if !s.syncStartNS.CompareAndSwap(0, timeInNS) {
s.log.Infof("resuming previous sync from %d (now=%d)", s.syncStartNS.Load(), timeInNS)
}

pr := s.ledger.LastRound()
Expand All @@ -657,18 +675,18 @@ func (s *Service) sync() {
initSync := false

// if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file.
if !s.suspendForCatchpointWriting {
if !s.suspendForLedgerOps {
// in that case, don't change the timer so that the "timer" would keep running.
atomic.StoreInt64(&s.syncStartNS, 0)
s.syncStartNS.Store(0)

// close the initial sync channel if not already close
if atomic.CompareAndSwapUint32(&s.initialSyncNotified, 0, 1) {
if s.initialSyncNotified.CompareAndSwap(0, 1) {
close(s.InitialSyncDone)
initSync = true
}
}

elapsedTime := time.Now().Sub(start)
elapsedTime := time.Since(start)
s.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.CatchupStopEvent, telemetryspec.CatchupStopEventDetails{
StartRound: uint64(pr),
EndRound: uint64(s.ledger.LastRound()),
Expand Down
61 changes: 59 additions & 2 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -830,6 +829,18 @@ func (m *mockedLedger) IsWritingCatchpointDataFile() bool {
return false
}

func (m *mockedLedger) IsBehindCommittingDeltas() bool {
return false
}

type mockedBehindDeltasLedger struct {
mockedLedger
}

func (m *mockedBehindDeltasLedger) IsBehindCommittingDeltas() bool {
return true
}

func testingenvWithUpgrade(
t testing.TB,
numBlocks,
Expand Down Expand Up @@ -1085,7 +1096,7 @@ func TestSynchronizingTime(t *testing.T) {
s := MakeService(logging.Base(), cfg, &httpTestPeerSource{}, ledger, &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)

require.Equal(t, time.Duration(0), s.SynchronizingTime())
atomic.StoreInt64(&s.syncStartNS, 1000000)
s.syncStartNS.Store(1000000)
require.NotEqual(t, time.Duration(0), s.SynchronizingTime())
}

Expand Down Expand Up @@ -1127,3 +1138,49 @@ func TestDownloadBlocksToSupportStateProofs(t *testing.T) {
lookback = lookbackForStateproofsSupport(&topBlk)
assert.Equal(t, uint64(0), lookback)
}

// TestServiceLedgerUnavailable checks a local ledger that is unavailable cannot catchup up to remote round
func TestServiceLedgerUnavailable(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedBehindDeltasLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 2
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
s.log = &periodicSyncLogger{Logger: logging.Base()}
s.deadlineTimeout = 2 * time.Second

s.testStart()
defer s.Stop()
s.sync()
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}
Loading

0 comments on commit 2e6b628

Please sign in to comment.