Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: restore block listeners on reloadLedger #6041

Merged
merged 5 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func buildTestLedger(t *testing.T, blk bookkeeping.Block) (ledger *data.Ledger,
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err = data.LoadLedger(
log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash,
nil, cfg,
log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, cfg,
)
if err != nil {
t.Fatal("couldn't build ledger", err)
Expand Down
4 changes: 2 additions & 2 deletions catchup/pref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {
for i := 0; i < b.N; i++ {
inMem := true
prefix := b.Name() + "empty" + strconv.Itoa(i)
local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg)
require.NoError(b, err)

// Make Service
Expand Down Expand Up @@ -150,7 +150,7 @@ func benchenv(t testing.TB, numAccounts, numBlocks int) (ledger, emptyLedger *da
cfg := config.GetDefaultLocal()
cfg.Archival = true
prefix := t.Name() + "empty"
emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg)
require.NoError(t, err)

ledger, err = datatest.FabricateLedger(logging.TestingLog(t), t.Name(), parts, genesisBalances, emptyLedger.LastRound()+basics.Round(numBlocks))
Expand Down
2 changes: 1 addition & 1 deletion daemon/algod/api/server/v2/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func testingenvWithBalances(t testing.TB, minMoneyAtStart, maxMoneyAtStart, numA
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, nil, cfg)
ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, cfg)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion data/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func testingenv(t testing.TB, numAccounts, numTxs int, offlineAccounts bool) (*L
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, cfg)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion data/datatest/fabricateLedger.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func FabricateLedger(log logging.Logger, ledgerName string, accounts []account.P
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, nil, cfg)
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, cfg)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions data/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type roundSeed struct {
func LoadLedger[T string | ledger.DirsAndPrefix](
log logging.Logger, dir T, memory bool,
genesisProto protocol.ConsensusVersion, genesisBal bookkeeping.GenesisBalances, genesisID string, genesisHash crypto.Digest,
blockListeners []ledgercore.BlockListener, cfg config.Local,
cfg config.Local,
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
) (*Ledger, error) {
if genesisBal.Balances == nil {
genesisBal.Balances = make(map[basics.Address]basics.AccountData)
Expand Down Expand Up @@ -115,7 +115,6 @@ func LoadLedger[T string | ledger.DirsAndPrefix](
}

l.Ledger = ll
l.RegisterBlockListeners(blockListeners)
return l, nil
}

Expand Down
18 changes: 9 additions & 9 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func BenchmarkTxHandlerProcessing(b *testing.B) {
cfg.Archival = true
cfg.TxBacklogReservedCapacityPerPeer = 1
cfg.IncomingConnectionsLimit = 10
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(b, err)
defer ledger.Close()

Expand Down Expand Up @@ -1027,7 +1027,7 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) {
cfg.Archival = true
cfg.EnableTxBacklogRateLimiting = false
cfg.TxIncomingFilteringFlags = 3 // txFilterRawMsg + txFilterCanonical
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -1196,7 +1196,7 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t
cfg := config.GetDefaultLocal()
cfg.Archival = true
cfg.EnableTxBacklogRateLimiting = false
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -1641,7 +1641,7 @@ func (g *txGenerator) makeLedger(tb testing.TB, cfg config.Local, log logging.Lo
ledgerName := fmt.Sprintf("%s-in_mem-w_inv=%d", namePrefix, ivrString)
ledgerName = strings.Replace(ledgerName, "#", "-", 1)
const inMem = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(tb, err)
return ledger
}
Expand Down Expand Up @@ -2183,7 +2183,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
cfg := config.GetDefaultLocal()
cfg.Archival = true
cfg.TxPoolSize = config.MaxTxGroupSize + 1
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2419,7 +2419,7 @@ func TestTxHandlerRestartWithBacklogAndTxPool(t *testing.T) { //nolint:parallelt
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Ledger.Close()

Expand Down Expand Up @@ -2524,7 +2524,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
cfg.TxBacklogServiceRateWindowSeconds = 1
cfg.TxBacklogAppTxPerSecondRate = 3
cfg.TxBacklogSize = 3
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2636,7 +2636,7 @@ func TestTxHandlerAppRateLimiter(t *testing.T) {
cfg.TxBacklogAppTxRateLimiterMaxSize = 100
cfg.TxBacklogServiceRateWindowSeconds = 1
cfg.TxBacklogAppTxPerSecondRate = 3
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2705,7 +2705,7 @@ func TestTxHandlerCapGuard(t *testing.T) {
cfg.IncomingConnectionsLimit = 1
cfg.TxBacklogSize = 3

ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down
9 changes: 9 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@
l.trackerMu.Lock()
defer l.trackerMu.Unlock()

// save block listeners to recover them later
blockListeners := make([]ledgercore.BlockListener, 0, len(l.notifier.listeners))
blockListeners = append(blockListeners, l.notifier.listeners...)

// close the trackers.
l.trackers.close()
cce marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -256,6 +260,9 @@
return err
}

// restore block listeners since l.notifier might not survive a reload
l.notifier.register(blockListeners)

// post-init actions
if trackerDBInitParams.VacuumOnStartup || l.cfg.OptimizeAccountsDatabaseOnStartup {
err = l.accts.vacuumDatabase(context.Background())
Expand Down Expand Up @@ -423,6 +430,8 @@
// RegisterBlockListeners registers listeners that will be called when a
// new block is added to the ledger.
func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()

Check warning on line 434 in ledger/ledger.go

View check run for this annotation

Codecov / codecov/patch

ledger/ledger.go#L433-L434

Added lines #L433 - L434 were not covered by tests
l.notifier.register(listeners)
}

Expand Down
35 changes: 35 additions & 0 deletions ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3422,5 +3422,40 @@ func TestLedgerRetainMinOffCatchpointInterval(t *testing.T) {
}
}()
}
}

type testBlockListener struct {
id int
}

func (t *testBlockListener) OnNewBlock(bookkeeping.Block, ledgercore.StateDelta) {}

// TestLedgerRegisterBlockListeners ensures that the block listeners survive reloadLedger
func TestLedgerRegisterBlockListeners(t *testing.T) {
partitiontest.PartitionTest(t)

genBalances, _, _ := ledgertesting.NewTestGenesis()
var genHash crypto.Digest
crypto.RandBytes(genHash[:])
cfg := config.GetDefaultLocal()
l := newSimpleLedgerFull(t, genBalances, protocol.ConsensusCurrentVersion, genHash, cfg)
defer l.Close()

l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{1}, &testBlockListener{2}})
l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{3}})

require.Equal(t, 3, len(l.notifier.listeners))
var ids []int
for _, bl := range l.notifier.listeners {
ids = append(ids, bl.(*testBlockListener).id)
}
require.Equal(t, []int{1, 2, 3}, ids)

l.reloadLedger()

ids = nil
for _, bl := range l.notifier.listeners {
ids = append(ids, bl.(*testBlockListener).id)
}
require.Equal(t, []int{1, 2, 3}, ids)
}
4 changes: 2 additions & 2 deletions node/assemble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func BenchmarkAssembleBlock(b *testing.B) {
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(b, err)

l := ledger
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestAssembleBlockTransactionPoolBehind(t *testing.T) {
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(log, "ledgerName", inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := data.LoadLedger(log, "ledgerName", inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)

l := ledger
Expand Down
8 changes: 2 additions & 6 deletions node/follower_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,13 @@ func MakeFollower(log logging.Logger, rootDir string, cfg config.Local, phoneboo
DBFilePrefix: config.LedgerFilenamePrefix,
ResolvedGenesisDirs: node.genesisDirs,
}
node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg)
node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, cfg)
if err != nil {
log.Errorf("Cannot initialize ledger (%v): %v", ledgerPaths, err)
return nil, err
}

blockListeners := []ledgercore.BlockListener{
node,
}

node.ledger.RegisterBlockListeners(blockListeners)
node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node})

if cfg.IsGossipServer() {
rpcs.MakeHealthService(node.net)
Expand Down
10 changes: 3 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
DBFilePrefix: config.LedgerFilenamePrefix,
ResolvedGenesisDirs: node.genesisDirs,
}
node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg)
node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, cfg)
if err != nil {
log.Errorf("Cannot initialize ledger (%v): %v", ledgerPaths, err)
return nil, err
Expand All @@ -246,12 +246,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd

node.transactionPool = pools.MakeTransactionPool(node.ledger.Ledger, cfg, node.log, node)

blockListeners := []ledgercore.BlockListener{
node.transactionPool,
node,
}

node.ledger.RegisterBlockListeners(blockListeners)
node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node.transactionPool, node})
txHandlerOpts := data.TxHandlerOpts{
TxPool: node.transactionPool,
ExecutionPool: node.lowPriorityCryptoVerificationPool,
Expand Down Expand Up @@ -1211,6 +1206,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
return
}
defer node.mu.Unlock()

// start
node.transactionPool.Reset()
node.catchupService.Start()
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func setupFullNodes(t *testing.T, proto protocol.ConsensusVersion, verificationP
cfg, err := config.LoadConfigFromDisk(rootDirectory)
require.NoError(t, err)
cfg.Archival = true
_, err = data.LoadLedger(logging.Base().With("name", nodeID), ledgerFilenamePrefix, inMem, g.Proto, bootstrap, g.ID(), g.Hash(), nil, cfg)
_, err = data.LoadLedger(logging.Base().With("name", nodeID), ledgerFilenamePrefix, inMem, g.Proto, bootstrap, g.ID(), g.Hash(), cfg)
require.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion rpcs/blockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func makeLedger(t *testing.T, namePostfix string) *data.Ledger {
prefix := t.Name() + namePostfix
ledger, err := data.LoadLedger(
log, prefix, inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash,
nil, cfg,
cfg,
)
require.NoError(t, err)
return ledger
Expand Down
Loading