From 5e92041afcfcb64a930368e420139f985d424f2f Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 26 Jun 2024 10:46:27 -0400 Subject: [PATCH 1/5] node: register block listeners after fast catchup * #6039 discovered blockNotifier preserves state between ledger reloads that breaks assumptions on how trackers work * Made node to clear and re-register block listeners explicitly on fast catchup. * Also removed unused blockListeners argument from data.Ledger --- catchup/fetcher_test.go | 3 +-- catchup/pref_test.go | 4 ++-- daemon/algod/api/server/v2/test/helpers.go | 2 +- data/common_test.go | 2 +- data/datatest/fabricateLedger.go | 2 +- data/ledger.go | 3 +-- data/txHandler_test.go | 18 +++++++++--------- ledger/ledger.go | 5 +++++ ledger/notifier.go | 6 ++++++ node/assemble_test.go | 4 ++-- node/follower_node.go | 2 +- node/node.go | 14 +++++++------- node/node_test.go | 2 +- rpcs/blockService_test.go | 2 +- 14 files changed, 39 insertions(+), 30 deletions(-) diff --git a/catchup/fetcher_test.go b/catchup/fetcher_test.go index dab8da4688..e219852bf4 100644 --- a/catchup/fetcher_test.go +++ b/catchup/fetcher_test.go @@ -66,8 +66,7 @@ func buildTestLedger(t *testing.T, blk bookkeeping.Block) (ledger *data.Ledger, cfg := config.GetDefaultLocal() cfg.Archival = true ledger, err = data.LoadLedger( - log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, - nil, cfg, + log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, cfg, ) if err != nil { t.Fatal("couldn't build ledger", err) diff --git a/catchup/pref_test.go b/catchup/pref_test.go index 38b2a9d16e..a72ed855ec 100644 --- a/catchup/pref_test.go +++ b/catchup/pref_test.go @@ -62,7 +62,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) { for i := 0; i < b.N; i++ { inMem := true prefix := b.Name() + "empty" + strconv.Itoa(i) - local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg) + local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg) require.NoError(b, err) // Make Service @@ -150,7 +150,7 @@ func benchenv(t testing.TB, numAccounts, numBlocks int) (ledger, emptyLedger *da cfg := config.GetDefaultLocal() cfg.Archival = true prefix := t.Name() + "empty" - emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg) + emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg) require.NoError(t, err) ledger, err = datatest.FabricateLedger(logging.TestingLog(t), t.Name(), parts, genesisBalances, emptyLedger.LastRound()+basics.Round(numBlocks)) diff --git a/daemon/algod/api/server/v2/test/helpers.go b/daemon/algod/api/server/v2/test/helpers.go index ad028fc8ae..5ba5256d63 100644 --- a/daemon/algod/api/server/v2/test/helpers.go +++ b/daemon/algod/api/server/v2/test/helpers.go @@ -313,7 +313,7 @@ func testingenvWithBalances(t testing.TB, minMoneyAtStart, maxMoneyAtStart, numA const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, nil, cfg) + ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, cfg) if err != nil { panic(err) } diff --git a/data/common_test.go b/data/common_test.go index 6079f4a226..6f947067fa 100644 --- a/data/common_test.go +++ b/data/common_test.go @@ -121,7 +121,7 @@ func testingenv(t testing.TB, numAccounts, numTxs int, offlineAccounts bool) (*L const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, cfg) if err != nil { panic(err) } diff --git a/data/datatest/fabricateLedger.go b/data/datatest/fabricateLedger.go index 9ad7bed4fc..cfde34af8a 100644 --- a/data/datatest/fabricateLedger.go +++ b/data/datatest/fabricateLedger.go @@ -38,7 +38,7 @@ func FabricateLedger(log logging.Logger, ledgerName string, accounts []account.P const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, nil, cfg) + ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, cfg) if err != nil { return nil, err } diff --git a/data/ledger.go b/data/ledger.go index fa3b958373..579d324d0b 100644 --- a/data/ledger.go +++ b/data/ledger.go @@ -81,7 +81,7 @@ type roundSeed struct { func LoadLedger[T string | ledger.DirsAndPrefix]( log logging.Logger, dir T, memory bool, genesisProto protocol.ConsensusVersion, genesisBal bookkeeping.GenesisBalances, genesisID string, genesisHash crypto.Digest, - blockListeners []ledgercore.BlockListener, cfg config.Local, + cfg config.Local, ) (*Ledger, error) { if genesisBal.Balances == nil { genesisBal.Balances = make(map[basics.Address]basics.AccountData) @@ -115,7 +115,6 @@ func LoadLedger[T string | ledger.DirsAndPrefix]( } l.Ledger = ll - l.RegisterBlockListeners(blockListeners) return l, nil } diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 896fbb161d..d395779f33 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -109,7 +109,7 @@ func BenchmarkTxHandlerProcessing(b *testing.B) { cfg.Archival = true cfg.TxBacklogReservedCapacityPerPeer = 1 cfg.IncomingConnectionsLimit = 10 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(b, err) defer ledger.Close() @@ -1027,7 +1027,7 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { cfg.Archival = true cfg.EnableTxBacklogRateLimiting = false cfg.TxIncomingFilteringFlags = 3 // txFilterRawMsg + txFilterCanonical - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -1196,7 +1196,7 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t cfg := config.GetDefaultLocal() cfg.Archival = true cfg.EnableTxBacklogRateLimiting = false - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -1641,7 +1641,7 @@ func (g *txGenerator) makeLedger(tb testing.TB, cfg config.Local, log logging.Lo ledgerName := fmt.Sprintf("%s-in_mem-w_inv=%d", namePrefix, ivrString) ledgerName = strings.Replace(ledgerName, "#", "-", 1) const inMem = true - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(tb, err) return ledger } @@ -2183,7 +2183,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall cfg := config.GetDefaultLocal() cfg.Archival = true cfg.TxPoolSize = config.MaxTxGroupSize + 1 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -2419,7 +2419,7 @@ func TestTxHandlerRestartWithBacklogAndTxPool(t *testing.T) { //nolint:parallelt const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Ledger.Close() @@ -2524,7 +2524,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) { cfg.TxBacklogServiceRateWindowSeconds = 1 cfg.TxBacklogAppTxPerSecondRate = 3 cfg.TxBacklogSize = 3 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -2636,7 +2636,7 @@ func TestTxHandlerAppRateLimiter(t *testing.T) { cfg.TxBacklogAppTxRateLimiterMaxSize = 100 cfg.TxBacklogServiceRateWindowSeconds = 1 cfg.TxBacklogAppTxPerSecondRate = 3 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -2705,7 +2705,7 @@ func TestTxHandlerCapGuard(t *testing.T) { cfg.IncomingConnectionsLimit = 1 cfg.TxBacklogSize = 3 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() diff --git a/ledger/ledger.go b/ledger/ledger.go index 2cc1b36ee1..f9425250e5 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -426,6 +426,11 @@ func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) { l.notifier.register(listeners) } +// ClearBlockListeners removes all listeners block listeners. +func (l *Ledger) ClearBlockListeners() { + l.notifier.clear() +} + // RegisterVotersCommitListener registers a listener that will be called when a // commit is about to cover a round. func (l *Ledger) RegisterVotersCommitListener(listener ledgercore.VotersCommitListener) { diff --git a/ledger/notifier.go b/ledger/notifier.go index f97e1c77e6..e895cd98ed 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -100,6 +100,12 @@ func (bn *blockNotifier) register(listeners []ledgercore.BlockListener) { bn.listeners = append(bn.listeners, listeners...) } +func (bn *blockNotifier) clear() { + bn.mu.Lock() + defer bn.mu.Unlock() + bn.listeners = nil +} + func (bn *blockNotifier) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { bn.mu.Lock() defer bn.mu.Unlock() diff --git a/node/assemble_test.go b/node/assemble_test.go index 51ff7d8edc..d2bf4dd7f7 100644 --- a/node/assemble_test.go +++ b/node/assemble_test.go @@ -83,7 +83,7 @@ func BenchmarkAssembleBlock(b *testing.B) { const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(b, err) l := ledger @@ -212,7 +212,7 @@ func TestAssembleBlockTransactionPoolBehind(t *testing.T) { const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := data.LoadLedger(log, "ledgerName", inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := data.LoadLedger(log, "ledgerName", inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) l := ledger diff --git a/node/follower_node.go b/node/follower_node.go index 8483f14679..d00e44d5c1 100644 --- a/node/follower_node.go +++ b/node/follower_node.go @@ -116,7 +116,7 @@ func MakeFollower(log logging.Logger, rootDir string, cfg config.Local, phoneboo DBFilePrefix: config.LedgerFilenamePrefix, ResolvedGenesisDirs: node.genesisDirs, } - node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg) + node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, cfg) if err != nil { log.Errorf("Cannot initialize ledger (%v): %v", ledgerPaths, err) return nil, err diff --git a/node/node.go b/node/node.go index 6c77b4fbc0..afb572e020 100644 --- a/node/node.go +++ b/node/node.go @@ -223,7 +223,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd DBFilePrefix: config.LedgerFilenamePrefix, ResolvedGenesisDirs: node.genesisDirs, } - node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg) + node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, cfg) if err != nil { log.Errorf("Cannot initialize ledger (%v): %v", ledgerPaths, err) return nil, err @@ -246,12 +246,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node.transactionPool = pools.MakeTransactionPool(node.ledger.Ledger, cfg, node.log, node) - blockListeners := []ledgercore.BlockListener{ - node.transactionPool, - node, - } - - node.ledger.RegisterBlockListeners(blockListeners) + node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node.transactionPool, node}) txHandlerOpts := data.TxHandlerOpts{ TxPool: node.transactionPool, ExecutionPool: node.lowPriorityCryptoVerificationPool, @@ -443,6 +438,7 @@ func (node *AlgorandFullNode) Stop() { node.blockService.Stop() node.ledgerService.Stop() } + node.ledger.ClearBlockListeners() node.catchupBlockAuth.Quit() node.log.Debug("crypto worker pools are stopping") node.highPriorityCryptoVerificationPool.Shutdown() @@ -1200,6 +1196,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo node.txPoolSyncerService.Stop() node.blockService.Stop() node.ledgerService.Stop() + node.ledger.ClearBlockListeners() prevNodeCancelFunc := node.cancelCtx @@ -1211,7 +1208,10 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo return } defer node.mu.Unlock() + // start + // catchpoint service reloads the ledger, need to re-register the listeners + node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node.transactionPool, node}) node.transactionPool.Reset() node.catchupService.Start() node.agreementService.Start() diff --git a/node/node_test.go b/node/node_test.go index dabb7958a5..54f2e1e6cc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -168,7 +168,7 @@ func setupFullNodes(t *testing.T, proto protocol.ConsensusVersion, verificationP cfg, err := config.LoadConfigFromDisk(rootDirectory) require.NoError(t, err) cfg.Archival = true - _, err = data.LoadLedger(logging.Base().With("name", nodeID), ledgerFilenamePrefix, inMem, g.Proto, bootstrap, g.ID(), g.Hash(), nil, cfg) + _, err = data.LoadLedger(logging.Base().With("name", nodeID), ledgerFilenamePrefix, inMem, g.Proto, bootstrap, g.ID(), g.Hash(), cfg) require.NoError(t, err) } diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go index 3aab7c4abb..7b1f756f08 100644 --- a/rpcs/blockService_test.go +++ b/rpcs/blockService_test.go @@ -520,7 +520,7 @@ func makeLedger(t *testing.T, namePostfix string) *data.Ledger { prefix := t.Name() + namePostfix ledger, err := data.LoadLedger( log, prefix, inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, - nil, cfg, + cfg, ) require.NoError(t, err) return ledger From 26581155ab39500595ec5791699cb5a129a0d458 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 26 Jun 2024 13:11:53 -0400 Subject: [PATCH 2/5] CR: rename clear() -> clearListeners() --- ledger/ledger.go | 2 +- ledger/notifier.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ledger/ledger.go b/ledger/ledger.go index f9425250e5..211a272d35 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -428,7 +428,7 @@ func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) { // ClearBlockListeners removes all listeners block listeners. func (l *Ledger) ClearBlockListeners() { - l.notifier.clear() + l.notifier.clearListeners() } // RegisterVotersCommitListener registers a listener that will be called when a diff --git a/ledger/notifier.go b/ledger/notifier.go index e895cd98ed..58071e24fb 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -100,7 +100,7 @@ func (bn *blockNotifier) register(listeners []ledgercore.BlockListener) { bn.listeners = append(bn.listeners, listeners...) } -func (bn *blockNotifier) clear() { +func (bn *blockNotifier) clearListeners() { bn.mu.Lock() defer bn.mu.Unlock() bn.listeners = nil From 1d71aac28a291fe3f162f51c3cf195108b527e2d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 26 Jun 2024 13:12:15 -0400 Subject: [PATCH 3/5] clear/register in follower node --- node/follower_node.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/node/follower_node.go b/node/follower_node.go index d00e44d5c1..a4549bc856 100644 --- a/node/follower_node.go +++ b/node/follower_node.go @@ -122,11 +122,7 @@ func MakeFollower(log logging.Logger, rootDir string, cfg config.Local, phoneboo return nil, err } - blockListeners := []ledgercore.BlockListener{ - node, - } - - node.ledger.RegisterBlockListeners(blockListeners) + node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node}) if cfg.IsGossipServer() { rpcs.MakeHealthService(node.net) @@ -218,6 +214,7 @@ func (node *AlgorandFollowerNode) Stop() { node.catchupService.Stop() node.blockService.Stop() } + node.ledger.ClearBlockListeners() node.catchupBlockAuth.Quit() node.lowPriorityCryptoVerificationPool.Shutdown() node.cryptoPool.Shutdown() @@ -407,6 +404,7 @@ func (node *AlgorandFollowerNode) SetCatchpointCatchupMode(catchpointCatchupMode node.net.ClearHandlers() node.catchupService.Stop() node.blockService.Stop() + node.ledger.ClearBlockListeners() prevNodeCancelFunc := node.cancelCtx @@ -427,6 +425,7 @@ func (node *AlgorandFollowerNode) SetCatchpointCatchupMode(catchpointCatchupMode } // start + node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node}) node.catchupService.Start() node.blockService.Start() From c5723fa95c2d248a479e9decfe3efef80b334b54 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 26 Jun 2024 14:38:50 -0400 Subject: [PATCH 4/5] make ledger to preserve block listeners --- ledger/ledger.go | 13 ++++++++----- ledger/ledger_test.go | 35 +++++++++++++++++++++++++++++++++++ node/follower_node.go | 3 --- node/node.go | 4 ---- 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/ledger/ledger.go b/ledger/ledger.go index 211a272d35..9a5cbf1df3 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -210,6 +210,10 @@ func (l *Ledger) reloadLedger() error { l.trackerMu.Lock() defer l.trackerMu.Unlock() + // save block listeners to recover them later + blockListeners := make([]ledgercore.BlockListener, 0, len(l.notifier.listeners)) + blockListeners = append(blockListeners, l.notifier.listeners...) + // close the trackers. l.trackers.close() @@ -256,6 +260,10 @@ func (l *Ledger) reloadLedger() error { return err } + // restore block listeners since l.notifier might not survive a reload + l.notifier.clearListeners() + l.notifier.register(blockListeners) + // post-init actions if trackerDBInitParams.VacuumOnStartup || l.cfg.OptimizeAccountsDatabaseOnStartup { err = l.accts.vacuumDatabase(context.Background()) @@ -426,11 +434,6 @@ func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) { l.notifier.register(listeners) } -// ClearBlockListeners removes all listeners block listeners. -func (l *Ledger) ClearBlockListeners() { - l.notifier.clearListeners() -} - // RegisterVotersCommitListener registers a listener that will be called when a // commit is about to cover a round. func (l *Ledger) RegisterVotersCommitListener(listener ledgercore.VotersCommitListener) { diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index 2a9666688b..968e6d8b21 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -3422,5 +3422,40 @@ func TestLedgerRetainMinOffCatchpointInterval(t *testing.T) { } }() } +} + +type testBlockListener struct { + id int +} + +func (t *testBlockListener) OnNewBlock(bookkeeping.Block, ledgercore.StateDelta) {} + +// TestLedgerRegisterBlockListeners ensures that the block listeners survive reloadLedger +func TestLedgerRegisterBlockListeners(t *testing.T) { + partitiontest.PartitionTest(t) + + genBalances, _, _ := ledgertesting.NewTestGenesis() + var genHash crypto.Digest + crypto.RandBytes(genHash[:]) + cfg := config.GetDefaultLocal() + l := newSimpleLedgerFull(t, genBalances, protocol.ConsensusCurrentVersion, genHash, cfg) + defer l.Close() + l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{1}, &testBlockListener{2}}) + l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{3}}) + + require.Equal(t, 3, len(l.notifier.listeners)) + var ids []int + for _, bl := range l.notifier.listeners { + ids = append(ids, bl.(*testBlockListener).id) + } + require.Equal(t, []int{1, 2, 3}, ids) + + l.reloadLedger() + + ids = nil + for _, bl := range l.notifier.listeners { + ids = append(ids, bl.(*testBlockListener).id) + } + require.Equal(t, []int{1, 2, 3}, ids) } diff --git a/node/follower_node.go b/node/follower_node.go index a4549bc856..e475b25481 100644 --- a/node/follower_node.go +++ b/node/follower_node.go @@ -214,7 +214,6 @@ func (node *AlgorandFollowerNode) Stop() { node.catchupService.Stop() node.blockService.Stop() } - node.ledger.ClearBlockListeners() node.catchupBlockAuth.Quit() node.lowPriorityCryptoVerificationPool.Shutdown() node.cryptoPool.Shutdown() @@ -404,7 +403,6 @@ func (node *AlgorandFollowerNode) SetCatchpointCatchupMode(catchpointCatchupMode node.net.ClearHandlers() node.catchupService.Stop() node.blockService.Stop() - node.ledger.ClearBlockListeners() prevNodeCancelFunc := node.cancelCtx @@ -425,7 +423,6 @@ func (node *AlgorandFollowerNode) SetCatchpointCatchupMode(catchpointCatchupMode } // start - node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node}) node.catchupService.Start() node.blockService.Start() diff --git a/node/node.go b/node/node.go index afb572e020..58160ff3e0 100644 --- a/node/node.go +++ b/node/node.go @@ -438,7 +438,6 @@ func (node *AlgorandFullNode) Stop() { node.blockService.Stop() node.ledgerService.Stop() } - node.ledger.ClearBlockListeners() node.catchupBlockAuth.Quit() node.log.Debug("crypto worker pools are stopping") node.highPriorityCryptoVerificationPool.Shutdown() @@ -1196,7 +1195,6 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo node.txPoolSyncerService.Stop() node.blockService.Stop() node.ledgerService.Stop() - node.ledger.ClearBlockListeners() prevNodeCancelFunc := node.cancelCtx @@ -1210,8 +1208,6 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo defer node.mu.Unlock() // start - // catchpoint service reloads the ledger, need to re-register the listeners - node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node.transactionPool, node}) node.transactionPool.Reset() node.catchupService.Start() node.agreementService.Start() From 2c7d6b9ce844069c8e5ebebcfee07d0fe095b880 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 26 Jun 2024 16:56:56 -0400 Subject: [PATCH 5/5] remove clearListeners --- ledger/ledger.go | 3 ++- ledger/notifier.go | 6 ------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/ledger/ledger.go b/ledger/ledger.go index 9a5cbf1df3..7459c23037 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -261,7 +261,6 @@ func (l *Ledger) reloadLedger() error { } // restore block listeners since l.notifier might not survive a reload - l.notifier.clearListeners() l.notifier.register(blockListeners) // post-init actions @@ -431,6 +430,8 @@ func (l *Ledger) Close() { // RegisterBlockListeners registers listeners that will be called when a // new block is added to the ledger. func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) { + l.trackerMu.RLock() + defer l.trackerMu.RUnlock() l.notifier.register(listeners) } diff --git a/ledger/notifier.go b/ledger/notifier.go index 58071e24fb..f97e1c77e6 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -100,12 +100,6 @@ func (bn *blockNotifier) register(listeners []ledgercore.BlockListener) { bn.listeners = append(bn.listeners, listeners...) } -func (bn *blockNotifier) clearListeners() { - bn.mu.Lock() - defer bn.mu.Unlock() - bn.listeners = nil -} - func (bn *blockNotifier) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { bn.mu.Lock() defer bn.mu.Unlock()