Skip to content

Commit

Permalink
make ledger to preserve block listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Jun 26, 2024
1 parent 1d71aac commit 8d4d0db
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 9 deletions.
13 changes: 8 additions & 5 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (l *Ledger) reloadLedger() error {
l.trackerMu.Lock()
defer l.trackerMu.Unlock()

// save block listeners to recover them later
blockListeners := make([]ledgercore.BlockListener, 0, len(l.notifier.listeners))
blockListeners = append(blockListeners, l.notifier.listeners...)

// close the trackers.
l.trackers.close()

Expand Down Expand Up @@ -256,6 +260,10 @@ func (l *Ledger) reloadLedger() error {
return err
}

// restore block listeners since l.notifier might not survive a reload
l.notifier.clearListeners()
l.notifier.register(blockListeners)

// post-init actions
if trackerDBInitParams.VacuumOnStartup || l.cfg.OptimizeAccountsDatabaseOnStartup {
err = l.accts.vacuumDatabase(context.Background())
Expand Down Expand Up @@ -426,11 +434,6 @@ func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) {
l.notifier.register(listeners)
}

// ClearBlockListeners removes all listeners block listeners.
func (l *Ledger) ClearBlockListeners() {
l.notifier.clearListeners()
}

// RegisterVotersCommitListener registers a listener that will be called when a
// commit is about to cover a round.
func (l *Ledger) RegisterVotersCommitListener(listener ledgercore.VotersCommitListener) {
Expand Down
37 changes: 37 additions & 0 deletions ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3422,5 +3422,42 @@ func TestLedgerRetainMinOffCatchpointInterval(t *testing.T) {
}
}()
}
}

type testBlockListener struct {
id int
}

func (t *testBlockListener) OnNewBlock(bookkeeping.Block, ledgercore.StateDelta) {}

// TestLedgerRegisterBlockListeners ensures that the block listeners survive reloadLedger
func TestLedgerRegisterBlockListeners(t *testing.T) {
partitiontest.PartitionTest(t)

genBalances, _, _ := ledgertesting.NewTestGenesis()
var genHash crypto.Digest
crypto.RandBytes(genHash[:])
cfg := config.GetDefaultLocal()
l := newSimpleLedgerFull(t, genBalances, protocol.ConsensusCurrentVersion, genHash, cfg)
defer l.Close()

l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{1}, &testBlockListener{2}})
l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{3}})

require.Equal(t, 3, len(l.notifier.listeners))
var ids []int
for _, bl := range l.notifier.listeners {
ids = append(ids, bl.(*testBlockListener).id)
}
sort.Ints(ids)
require.Equal(t, []int{1, 2, 3}, ids)

l.reloadLedger()

ids = nil
for _, bl := range l.notifier.listeners {
ids = append(ids, bl.(*testBlockListener).id)
}
sort.Ints(ids)
require.Equal(t, []int{1, 2, 3}, ids)
}
2 changes: 0 additions & 2 deletions node/follower_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ func (node *AlgorandFollowerNode) Stop() {
node.catchupService.Stop()
node.blockService.Stop()
}
node.ledger.ClearBlockListeners()
node.catchupBlockAuth.Quit()
node.lowPriorityCryptoVerificationPool.Shutdown()
node.cryptoPool.Shutdown()
Expand Down Expand Up @@ -404,7 +403,6 @@ func (node *AlgorandFollowerNode) SetCatchpointCatchupMode(catchpointCatchupMode
node.net.ClearHandlers()
node.catchupService.Stop()
node.blockService.Stop()
node.ledger.ClearBlockListeners()

prevNodeCancelFunc := node.cancelCtx

Expand Down
2 changes: 0 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ func (node *AlgorandFullNode) Stop() {
node.blockService.Stop()
node.ledgerService.Stop()
}
node.ledger.ClearBlockListeners()
node.catchupBlockAuth.Quit()
node.log.Debug("crypto worker pools are stopping")
node.highPriorityCryptoVerificationPool.Shutdown()
Expand Down Expand Up @@ -1196,7 +1195,6 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
node.txPoolSyncerService.Stop()
node.blockService.Stop()
node.ledgerService.Stop()
node.ledger.ClearBlockListeners()

prevNodeCancelFunc := node.cancelCtx

Expand Down

0 comments on commit 8d4d0db

Please sign in to comment.