diff --git a/catchup/fetcher_test.go b/catchup/fetcher_test.go index dab8da4688..e219852bf4 100644 --- a/catchup/fetcher_test.go +++ b/catchup/fetcher_test.go @@ -66,8 +66,7 @@ func buildTestLedger(t *testing.T, blk bookkeeping.Block) (ledger *data.Ledger, cfg := config.GetDefaultLocal() cfg.Archival = true ledger, err = data.LoadLedger( - log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, - nil, cfg, + log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, cfg, ) if err != nil { t.Fatal("couldn't build ledger", err) diff --git a/catchup/pref_test.go b/catchup/pref_test.go index 38b2a9d16e..a72ed855ec 100644 --- a/catchup/pref_test.go +++ b/catchup/pref_test.go @@ -62,7 +62,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) { for i := 0; i < b.N; i++ { inMem := true prefix := b.Name() + "empty" + strconv.Itoa(i) - local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg) + local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg) require.NoError(b, err) // Make Service @@ -150,7 +150,7 @@ func benchenv(t testing.TB, numAccounts, numBlocks int) (ledger, emptyLedger *da cfg := config.GetDefaultLocal() cfg.Archival = true prefix := t.Name() + "empty" - emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg) + emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg) require.NoError(t, err) ledger, err = datatest.FabricateLedger(logging.TestingLog(t), t.Name(), parts, genesisBalances, emptyLedger.LastRound()+basics.Round(numBlocks)) diff --git a/daemon/algod/api/server/v2/test/helpers.go b/daemon/algod/api/server/v2/test/helpers.go index ad028fc8ae..5ba5256d63 100644 --- a/daemon/algod/api/server/v2/test/helpers.go +++ b/daemon/algod/api/server/v2/test/helpers.go @@ -313,7 +313,7 @@ func testingenvWithBalances(t testing.TB, minMoneyAtStart, maxMoneyAtStart, numA const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, nil, cfg) + ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, cfg) if err != nil { panic(err) } diff --git a/data/common_test.go b/data/common_test.go index 6079f4a226..6f947067fa 100644 --- a/data/common_test.go +++ b/data/common_test.go @@ -121,7 +121,7 @@ func testingenv(t testing.TB, numAccounts, numTxs int, offlineAccounts bool) (*L const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, cfg) if err != nil { panic(err) } diff --git a/data/datatest/fabricateLedger.go b/data/datatest/fabricateLedger.go index 9ad7bed4fc..cfde34af8a 100644 --- a/data/datatest/fabricateLedger.go +++ b/data/datatest/fabricateLedger.go @@ -38,7 +38,7 @@ func FabricateLedger(log logging.Logger, ledgerName string, accounts []account.P const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, nil, cfg) + ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, cfg) if err != nil { return nil, err } diff --git a/data/ledger.go b/data/ledger.go index fa3b958373..579d324d0b 100644 --- a/data/ledger.go +++ b/data/ledger.go @@ -81,7 +81,7 @@ type roundSeed struct { func LoadLedger[T string | ledger.DirsAndPrefix]( log logging.Logger, dir T, memory bool, genesisProto protocol.ConsensusVersion, genesisBal bookkeeping.GenesisBalances, genesisID string, genesisHash crypto.Digest, - blockListeners []ledgercore.BlockListener, cfg config.Local, + cfg config.Local, ) (*Ledger, error) { if genesisBal.Balances == nil { genesisBal.Balances = make(map[basics.Address]basics.AccountData) @@ -115,7 +115,6 @@ func LoadLedger[T string | ledger.DirsAndPrefix]( } l.Ledger = ll - l.RegisterBlockListeners(blockListeners) return l, nil } diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 896fbb161d..d395779f33 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -109,7 +109,7 @@ func BenchmarkTxHandlerProcessing(b *testing.B) { cfg.Archival = true cfg.TxBacklogReservedCapacityPerPeer = 1 cfg.IncomingConnectionsLimit = 10 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(b, err) defer ledger.Close() @@ -1027,7 +1027,7 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { cfg.Archival = true cfg.EnableTxBacklogRateLimiting = false cfg.TxIncomingFilteringFlags = 3 // txFilterRawMsg + txFilterCanonical - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -1196,7 +1196,7 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t cfg := config.GetDefaultLocal() cfg.Archival = true cfg.EnableTxBacklogRateLimiting = false - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -1641,7 +1641,7 @@ func (g *txGenerator) makeLedger(tb testing.TB, cfg config.Local, log logging.Lo ledgerName := fmt.Sprintf("%s-in_mem-w_inv=%d", namePrefix, ivrString) ledgerName = strings.Replace(ledgerName, "#", "-", 1) const inMem = true - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(tb, err) return ledger } @@ -2183,7 +2183,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall cfg := config.GetDefaultLocal() cfg.Archival = true cfg.TxPoolSize = config.MaxTxGroupSize + 1 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -2419,7 +2419,7 @@ func TestTxHandlerRestartWithBacklogAndTxPool(t *testing.T) { //nolint:parallelt const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Ledger.Close() @@ -2524,7 +2524,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) { cfg.TxBacklogServiceRateWindowSeconds = 1 cfg.TxBacklogAppTxPerSecondRate = 3 cfg.TxBacklogSize = 3 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -2636,7 +2636,7 @@ func TestTxHandlerAppRateLimiter(t *testing.T) { cfg.TxBacklogAppTxRateLimiterMaxSize = 100 cfg.TxBacklogServiceRateWindowSeconds = 1 cfg.TxBacklogAppTxPerSecondRate = 3 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() @@ -2705,7 +2705,7 @@ func TestTxHandlerCapGuard(t *testing.T) { cfg.IncomingConnectionsLimit = 1 cfg.TxBacklogSize = 3 - ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) defer ledger.Close() diff --git a/ledger/ledger.go b/ledger/ledger.go index 2cc1b36ee1..7459c23037 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -210,6 +210,10 @@ func (l *Ledger) reloadLedger() error { l.trackerMu.Lock() defer l.trackerMu.Unlock() + // save block listeners to recover them later + blockListeners := make([]ledgercore.BlockListener, 0, len(l.notifier.listeners)) + blockListeners = append(blockListeners, l.notifier.listeners...) + // close the trackers. l.trackers.close() @@ -256,6 +260,9 @@ func (l *Ledger) reloadLedger() error { return err } + // restore block listeners since l.notifier might not survive a reload + l.notifier.register(blockListeners) + // post-init actions if trackerDBInitParams.VacuumOnStartup || l.cfg.OptimizeAccountsDatabaseOnStartup { err = l.accts.vacuumDatabase(context.Background()) @@ -423,6 +430,8 @@ func (l *Ledger) Close() { // RegisterBlockListeners registers listeners that will be called when a // new block is added to the ledger. func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) { + l.trackerMu.RLock() + defer l.trackerMu.RUnlock() l.notifier.register(listeners) } diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index 2a9666688b..968e6d8b21 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -3422,5 +3422,40 @@ func TestLedgerRetainMinOffCatchpointInterval(t *testing.T) { } }() } +} + +type testBlockListener struct { + id int +} + +func (t *testBlockListener) OnNewBlock(bookkeeping.Block, ledgercore.StateDelta) {} + +// TestLedgerRegisterBlockListeners ensures that the block listeners survive reloadLedger +func TestLedgerRegisterBlockListeners(t *testing.T) { + partitiontest.PartitionTest(t) + + genBalances, _, _ := ledgertesting.NewTestGenesis() + var genHash crypto.Digest + crypto.RandBytes(genHash[:]) + cfg := config.GetDefaultLocal() + l := newSimpleLedgerFull(t, genBalances, protocol.ConsensusCurrentVersion, genHash, cfg) + defer l.Close() + l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{1}, &testBlockListener{2}}) + l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{3}}) + + require.Equal(t, 3, len(l.notifier.listeners)) + var ids []int + for _, bl := range l.notifier.listeners { + ids = append(ids, bl.(*testBlockListener).id) + } + require.Equal(t, []int{1, 2, 3}, ids) + + l.reloadLedger() + + ids = nil + for _, bl := range l.notifier.listeners { + ids = append(ids, bl.(*testBlockListener).id) + } + require.Equal(t, []int{1, 2, 3}, ids) } diff --git a/node/assemble_test.go b/node/assemble_test.go index 51ff7d8edc..d2bf4dd7f7 100644 --- a/node/assemble_test.go +++ b/node/assemble_test.go @@ -83,7 +83,7 @@ func BenchmarkAssembleBlock(b *testing.B) { const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(b, err) l := ledger @@ -212,7 +212,7 @@ func TestAssembleBlockTransactionPoolBehind(t *testing.T) { const inMem = true cfg := config.GetDefaultLocal() cfg.Archival = true - ledger, err := data.LoadLedger(log, "ledgerName", inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + ledger, err := data.LoadLedger(log, "ledgerName", inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) require.NoError(t, err) l := ledger diff --git a/node/follower_node.go b/node/follower_node.go index 8483f14679..e475b25481 100644 --- a/node/follower_node.go +++ b/node/follower_node.go @@ -116,17 +116,13 @@ func MakeFollower(log logging.Logger, rootDir string, cfg config.Local, phoneboo DBFilePrefix: config.LedgerFilenamePrefix, ResolvedGenesisDirs: node.genesisDirs, } - node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg) + node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, cfg) if err != nil { log.Errorf("Cannot initialize ledger (%v): %v", ledgerPaths, err) return nil, err } - blockListeners := []ledgercore.BlockListener{ - node, - } - - node.ledger.RegisterBlockListeners(blockListeners) + node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node}) if cfg.IsGossipServer() { rpcs.MakeHealthService(node.net) diff --git a/node/node.go b/node/node.go index 6c77b4fbc0..58160ff3e0 100644 --- a/node/node.go +++ b/node/node.go @@ -223,7 +223,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd DBFilePrefix: config.LedgerFilenamePrefix, ResolvedGenesisDirs: node.genesisDirs, } - node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg) + node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, cfg) if err != nil { log.Errorf("Cannot initialize ledger (%v): %v", ledgerPaths, err) return nil, err @@ -246,12 +246,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node.transactionPool = pools.MakeTransactionPool(node.ledger.Ledger, cfg, node.log, node) - blockListeners := []ledgercore.BlockListener{ - node.transactionPool, - node, - } - - node.ledger.RegisterBlockListeners(blockListeners) + node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node.transactionPool, node}) txHandlerOpts := data.TxHandlerOpts{ TxPool: node.transactionPool, ExecutionPool: node.lowPriorityCryptoVerificationPool, @@ -1211,6 +1206,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo return } defer node.mu.Unlock() + // start node.transactionPool.Reset() node.catchupService.Start() diff --git a/node/node_test.go b/node/node_test.go index dabb7958a5..54f2e1e6cc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -168,7 +168,7 @@ func setupFullNodes(t *testing.T, proto protocol.ConsensusVersion, verificationP cfg, err := config.LoadConfigFromDisk(rootDirectory) require.NoError(t, err) cfg.Archival = true - _, err = data.LoadLedger(logging.Base().With("name", nodeID), ledgerFilenamePrefix, inMem, g.Proto, bootstrap, g.ID(), g.Hash(), nil, cfg) + _, err = data.LoadLedger(logging.Base().With("name", nodeID), ledgerFilenamePrefix, inMem, g.Proto, bootstrap, g.ID(), g.Hash(), cfg) require.NoError(t, err) } diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go index 3aab7c4abb..7b1f756f08 100644 --- a/rpcs/blockService_test.go +++ b/rpcs/blockService_test.go @@ -520,7 +520,7 @@ func makeLedger(t *testing.T, namePostfix string) *data.Ledger { prefix := t.Name() + namePostfix ledger, err := data.LoadLedger( log, prefix, inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, - nil, cfg, + cfg, ) require.NoError(t, err) return ledger