From 910c9ade39c0bcdff5f2badd94efbe016a428e73 Mon Sep 17 00:00:00 2001 From: George Knee Date: Tue, 19 Nov 2024 14:08:20 +0000 Subject: [PATCH] batcher: add batchSubmitter.checkExpectedProgress (#12430) * implement batchSubmitter.checkExpectedProgress * remove buffer variable * add warning logs when calling waitNodeSyncAndClearState * push method down into channel manager and add test * clarify SyncStatus documentation * improve TestChannelManager_CheckExpectedProgress make parameters "tighter" / more realistic and check an extra case --- op-batcher/batcher/channel_manager.go | 16 +++++++ op-batcher/batcher/channel_manager_test.go | 54 ++++++++++++++++++++++ op-batcher/batcher/driver.go | 30 ++++++++---- op-service/eth/sync_status.go | 2 +- 4 files changed, 93 insertions(+), 9 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 4208a8cd3795..81ee0fb35a51 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -549,3 +549,19 @@ func (s *channelManager) PendingDABytes() int64 { } return int64(f) } + +// CheckExpectedProgress uses the supplied syncStatus to infer +// whether the node providing the status has made the expected +// safe head progress given fully submitted channels held in +// state. +func (m *channelManager) CheckExpectedProgress(syncStatus eth.SyncStatus) error { + for _, ch := range m.channelQueue { + if ch.isFullySubmitted() && // This implies a number of l1 confirmations has passed, depending on how the txmgr was configured + !ch.isTimedOut() && + syncStatus.CurrentL1.Number > ch.maxInclusionBlock && + syncStatus.SafeL2.Number < ch.LatestL2().Number { + return errors.New("safe head did not make expected progress") + } + } + return nil +} diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 1c742207a5f0..32aae1b06dd1 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -627,3 +627,57 @@ func TestChannelManager_ChannelOutFactory(t *testing.T) { require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co) } + +func TestChannelManager_CheckExpectedProgress(t *testing.T) { + l := testlog.Logger(t, log.LevelCrit) + cfg := channelManagerTestConfig(100, derive.SingularBatchType) + cfg.InitNoneCompressor() + m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + + channelMaxInclusionBlockNumber := uint64(3) + channelLatestSafeBlockNumber := uint64(11) + + // Prepare a (dummy) fully submitted channel + // with + // maxInclusionBlock and latest safe block number as above + A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) + require.NoError(t, err) + rng := rand.New(rand.NewSource(123)) + a0 := derivetest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID) + a0 = a0.WithSeal(&types.Header{Number: big.NewInt(int64(channelLatestSafeBlockNumber))}) + _, err = A.AddBlock(a0) + require.NoError(t, err) + A.maxInclusionBlock = channelMaxInclusionBlockNumber + A.Close() + A.channelBuilder.frames = nil + A.channelBuilder.frameCursor = 0 + require.True(t, A.isFullySubmitted()) + + m.channelQueue = append(m.channelQueue, A) + + // The current L1 number implies that + // channel A above should have been derived + // from, so we expect safe head to progress to + // the channelLatestSafeBlockNumber. + // Since the safe head moved to 11, there is no error: + ss := eth.SyncStatus{ + CurrentL1: eth.L1BlockRef{Number: channelMaxInclusionBlockNumber + 1}, + SafeL2: eth.L2BlockRef{Number: channelLatestSafeBlockNumber}, + } + err = m.CheckExpectedProgress(ss) + require.NoError(t, err) + + // If the currentL1 is as above but the + // safe head is less than channelLatestSafeBlockNumber, + // the method should return an error: + ss.SafeL2 = eth.L2BlockRef{Number: channelLatestSafeBlockNumber - 1} + err = m.CheckExpectedProgress(ss) + require.Error(t, err) + + // If the safe head is still less than channelLatestSafeBlockNumber + // but the currentL1 is _equal_ to the channelMaxInclusionBlockNumber + // there should be no error as that block is still being derived from: + ss.CurrentL1 = eth.L1BlockRef{Number: channelMaxInclusionBlockNumber} + err = m.CheckExpectedProgress(ss) + require.NoError(t, err) +} diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 7b1d6139e265..dde08ac84193 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -471,17 +471,20 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR l.state.pruneSafeBlocks(syncStatus.SafeL2) l.state.pruneChannels(syncStatus.SafeL2) + + err = l.state.CheckExpectedProgress(*syncStatus) + if err != nil { + l.Log.Warn("error checking expected progress, clearing state and waiting for node sync", "err", err) + l.waitNodeSyncAndClearState() + continue + } + if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) { - // Wait for any in flight transactions - // to be ingested by the node before - // we start loading blocks again. - err := l.waitNodeSync() - if err != nil { - l.Log.Warn("error waiting for node sync", "err", err) - } - l.clearState(l.shutdownCtx) + l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err) + l.waitNodeSyncAndClearState() continue } + l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) case <-ctx.Done(): l.Log.Warn("main loop returning") @@ -579,6 +582,17 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) { } } +func (l *BatchSubmitter) waitNodeSyncAndClearState() { + // Wait for any in flight transactions + // to be ingested by the node before + // we start loading blocks again. + err := l.waitNodeSync() + if err != nil { + l.Log.Warn("error waiting for node sync", "err", err) + } + l.clearState(l.shutdownCtx) +} + // waitNodeSync Check to see if there was a batcher tx sent recently that // still needs more block confirmations before being considered finalized func (l *BatchSubmitter) waitNodeSync() error { diff --git a/op-service/eth/sync_status.go b/op-service/eth/sync_status.go index f9db1f672b82..e16275920e2b 100644 --- a/op-service/eth/sync_status.go +++ b/op-service/eth/sync_status.go @@ -5,7 +5,7 @@ package eth type SyncStatus struct { // CurrentL1 is the L1 block that the derivation process is last idled at. // This may not be fully derived into L2 data yet. - // The safe L2 blocks were produced/included fully from the L1 chain up to and including this L1 block. + // The safe L2 blocks were produced/included fully from the L1 chain up to _but excluding_ this L1 block. // If the node is synced, this matches the HeadL1, minus the verifier confirmation distance. CurrentL1 L1BlockRef `json:"current_l1"` // CurrentL1Finalized is a legacy sync-status attribute. This is deprecated.