Skip to content

Commit

Permalink
catchup: pause catchup if ledger lagging behind (#5794)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Oct 26, 2023
1 parent 8c54c22 commit f5d901a
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 130 deletions.
39 changes: 30 additions & 9 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Ledger interface {
Block(basics.Round) (bookkeeping.Block, error)
BlockHdr(basics.Round) (bookkeeping.BlockHeader, error)
IsWritingCatchpointDataFile() bool
IsBehindCommittingDeltas() bool
Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error)
AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error
WaitMem(r basics.Round) chan struct{}
Expand All @@ -86,10 +87,10 @@ type Service struct {
deadlineTimeout time.Duration
blockValidationPool execpool.BacklogPool

// suspendForCatchpointWriting defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file or flushing accounts. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// and resume from there without stopping the catchup timer.
suspendForCatchpointWriting bool
suspendForLedgerOps bool

// The channel gets closed when the initial sync is complete. This allows for other services to avoid
// the overhead of starting prematurely (before this node is caught-up and can validate messages for example).
Expand Down Expand Up @@ -494,11 +495,26 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
return
}

// if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks.
start := time.Now()
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout {
time.Sleep(100 * time.Millisecond)
}

// if ledger is still busy after s.deadlineTimeout timeout then abort the current pipelinedFetch invocation.

// if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we
// could resume with the catchup.
if s.ledger.IsWritingCatchpointDataFile() {
s.log.Info("Catchup is stopping due to catchpoint file being written")
s.suspendForCatchpointWriting = true
s.suspendForLedgerOps = true
return
}

// if the ledger has too many non-flushed account changes, stop catching up to reduce the memory pressure.
if s.ledger.IsBehindCommittingDeltas() {
s.log.Info("Catchup is stopping due to too many non-flushed account changes")
s.suspendForLedgerOps = true
return
}

Expand Down Expand Up @@ -555,10 +571,10 @@ func (s *Service) periodicSync() {
sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout
continue
case <-s.syncNow:
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() {
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() {
continue
}
s.suspendForCatchpointWriting = false
s.suspendForLedgerOps = false
s.log.Info("Immediate resync triggered; resyncing")
s.sync()
case <-time.After(sleepDuration):
Expand All @@ -575,7 +591,12 @@ func (s *Service) periodicSync() {
// keep the existing sleep duration and try again later.
continue
}
s.suspendForCatchpointWriting = false
// if the ledger has too many non-flushed account changes, skip
if s.ledger.IsBehindCommittingDeltas() {
continue
}

s.suspendForLedgerOps = false
s.log.Info("It's been too long since our ledger advanced; resyncing")
s.sync()
case cert := <-s.unmatchedPendingCertificates:
Expand Down Expand Up @@ -630,7 +651,7 @@ func (s *Service) sync() {
initSync := false

// if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file.
if !s.suspendForCatchpointWriting {
if !s.suspendForLedgerOps {
// in that case, don't change the timer so that the "timer" would keep running.
s.syncStartNS.Store(0)

Expand All @@ -641,7 +662,7 @@ func (s *Service) sync() {
}
}

elapsedTime := time.Now().Sub(start)
elapsedTime := time.Since(start)
s.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.CatchupStopEvent, telemetryspec.CatchupStopEventDetails{
StartRound: uint64(pr),
EndRound: uint64(s.ledger.LastRound()),
Expand Down
58 changes: 58 additions & 0 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,18 @@ func (m *mockedLedger) IsWritingCatchpointDataFile() bool {
return false
}

func (m *mockedLedger) IsBehindCommittingDeltas() bool {
return false
}

type mockedBehindDeltasLedger struct {
mockedLedger
}

func (m *mockedBehindDeltasLedger) IsBehindCommittingDeltas() bool {
return true
}

func testingenvWithUpgrade(
t testing.TB,
numBlocks,
Expand Down Expand Up @@ -1126,3 +1138,49 @@ func TestDownloadBlocksToSupportStateProofs(t *testing.T) {
lookback = lookbackForStateproofsSupport(&topBlk)
assert.Equal(t, uint64(0), lookback)
}

// TestServiceLedgerUnavailable checks a local ledger that is unavailable cannot catchup up to remote round
func TestServiceLedgerUnavailable(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedBehindDeltasLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 2
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
s.log = &periodicSyncLogger{Logger: logging.Base()}
s.deadlineTimeout = 2 * time.Second

s.testStart()
defer s.Stop()
s.sync()
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}
37 changes: 1 addition & 36 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) {

// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
type blockingTracker struct {
emptyTracker
postCommitUnlockedEntryLock chan struct{}
postCommitUnlockedReleaseLock chan struct{}
postCommitEntryLock chan struct{}
Expand All @@ -783,36 +784,12 @@ type blockingTracker struct {
shouldLockPostCommitUnlocked atomic.Bool
}

// loadFromDisk is not implemented in the blockingTracker.
func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
return nil
}

// newBlock is not implemented in the blockingTracker.
func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
}

// committedUpTo in the blockingTracker just stores the committed round.
func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
bt.committedUpToRound.Store(int64(committedRnd))
return committedRnd, basics.Round(0)
}

// produceCommittingTask is not used by the blockingTracker
func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
return dcr
}

// prepareCommit, is not used by the blockingTracker
func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error {
return nil
}

// commitRound is not used by the blockingTracker
func (bt *blockingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error {
return nil
}

// postCommit implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
if bt.alwaysLock.Load() || dcc.catchpointFirstStage || bt.shouldLockPostCommit.Load() {
Expand All @@ -829,18 +806,6 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred
}
}

// control functions are not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) {
}

// close is not used by the blockingTracker
func (bt *blockingTracker) close() {
}

func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down
6 changes: 6 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,12 @@ func (l *Ledger) LatestTrackerCommitted() basics.Round {
return l.trackers.getDbRound()
}

// IsBehindCommittingDeltas indicates if the ledger is behind expected number of in-memory deltas.
// It intended to slow down the catchup service when deltas overgrow some limit.
func (l *Ledger) IsBehindCommittingDeltas() bool {
return l.trackers.isBehindCommittingDeltas(l.Latest())
}

// DebuggerLedger defines the minimal set of method required for creating a debug balances.
type DebuggerLedger = eval.LedgerForCowBase

Expand Down
78 changes: 57 additions & 21 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/algorand/go-algorand/config"
Expand Down Expand Up @@ -175,6 +176,8 @@ type trackerRegistry struct {

// accountsWriting provides synchronization around the background writing of account balances.
accountsWriting sync.WaitGroup
// accountsCommitting is set when trackers registry writing accounts into DB.
accountsCommitting atomic.Bool

// dbRound is always exactly accountsRound(),
// cached to avoid SQL queries.
Expand All @@ -196,8 +199,16 @@ type trackerRegistry struct {
lastFlushTime time.Time

cfg config.Local

// maxAccountDeltas is a maximum number of in-memory deltas stored by trackers.
// When exceeded trackerRegistry will attempt to flush, and its Available() method will return false.
// Too many in-memory deltas could cause the node to run out of memory.
maxAccountDeltas uint64
}

// defaultMaxAccountDeltas is a default value for maxAccountDeltas.
const defaultMaxAccountDeltas = 256

// deferredCommitRange is used during the calls to produceCommittingTask, and used as a data structure
// to syncronize the various trackers and create a uniformity around which rounds need to be persisted
// next.
Expand Down Expand Up @@ -285,26 +296,18 @@ func (dcc deferredCommitContext) newBase() basics.Round {
return dcc.oldBase + basics.Round(dcc.offset)
}

var errMissingAccountUpdateTracker = errors.New("initializeTrackerCaches : called without a valid accounts update tracker")
var errMissingAccountUpdateTracker = errors.New("trackers replay : called without a valid accounts update tracker")

func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTracker, cfg config.Local) (err error) {
tr.mu.Lock()
defer tr.mu.Unlock()
tr.dbs = l.trackerDB()
tr.log = l.trackerLog()

err = tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) {
ar, err := tx.MakeAccountsReader()
if err != nil {
return err
}

tr.dbRound, err = ar.AccountsRound()
return err
})

if err != nil {
return err
tr.maxAccountDeltas = defaultMaxAccountDeltas
if cfg.MaxAcctLookback > tr.maxAccountDeltas {
tr.maxAccountDeltas = cfg.MaxAcctLookback + 1
tr.log.Infof("maxAccountDeltas was overridden to %d because of MaxAcctLookback=%d: this combination might use lots of RAM. To preserve some blocks in blockdb consider using MaxBlockHistoryLookback config option instead of MaxAcctLookback", tr.maxAccountDeltas, cfg.MaxAcctLookback)
}

tr.ctx, tr.ctxCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -333,24 +336,38 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack
}

func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error {
var dbRound basics.Round
err := tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) {
ar, err0 := tx.MakeAccountsReader()
if err0 != nil {
return err0
}

dbRound, err0 = ar.AccountsRound()
return err0
})
if err != nil {
return err
}

tr.mu.RLock()
dbRound := tr.dbRound
tr.dbRound = dbRound
tr.mu.RUnlock()

for _, lt := range tr.trackers {
err := lt.loadFromDisk(l, dbRound)
if err != nil {
err0 := lt.loadFromDisk(l, dbRound)
if err0 != nil {
// find the tracker name.
trackerName := reflect.TypeOf(lt).String()
return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err)
return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err0)
}
}

err := tr.replay(l)
if err != nil {
err = fmt.Errorf("initializeTrackerCaches failed : %w", err)
if err0 := tr.replay(l); err0 != nil {
return fmt.Errorf("trackers replay failed : %w", err0)
}
return err

return nil
}

func (tr *trackerRegistry) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
Expand Down Expand Up @@ -456,6 +473,20 @@ func (tr *trackerRegistry) waitAccountsWriting() {
tr.accountsWriting.Wait()
}

func (tr *trackerRegistry) isBehindCommittingDeltas(latest basics.Round) bool {
tr.mu.RLock()
dbRound := tr.dbRound
tr.mu.RUnlock()

numDeltas := uint64(latest.SubSaturate(dbRound))
if numDeltas < tr.maxAccountDeltas {
return false
}

// there is a large number of deltas check if commitSyncer is not writing accounts
return tr.accountsCommitting.Load()
}

func (tr *trackerRegistry) close() {
if tr.ctxCancel != nil {
tr.ctxCancel()
Expand Down Expand Up @@ -562,6 +593,11 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {
start := time.Now()
ledgerCommitroundCount.Inc(nil)
err = tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) {
tr.accountsCommitting.Store(true)
defer func() {
tr.accountsCommitting.Store(false)
}()

aw, err := tx.MakeAccountsWriter()
if err != nil {
return err
Expand Down
Loading

0 comments on commit f5d901a

Please sign in to comment.