Skip to content

Commit

Permalink
batcher: add batchSubmitter.checkExpectedProgress (ethereum-optimism#…
Browse files Browse the repository at this point in the history
…12430)

* implement batchSubmitter.checkExpectedProgress

* remove buffer variable

* add warning logs when calling waitNodeSyncAndClearState

* push method down into channel manager and add test

* clarify SyncStatus documentation

* improve TestChannelManager_CheckExpectedProgress

make parameters "tighter" / more realistic and check an extra case
  • Loading branch information
geoknee authored Nov 19, 2024
1 parent 8e0b89c commit 910c9ad
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 9 deletions.
16 changes: 16 additions & 0 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,19 @@ func (s *channelManager) PendingDABytes() int64 {
}
return int64(f)
}

// CheckExpectedProgress uses the supplied syncStatus to infer
// whether the node providing the status has made the expected
// safe head progress given fully submitted channels held in
// state.
func (m *channelManager) CheckExpectedProgress(syncStatus eth.SyncStatus) error {
for _, ch := range m.channelQueue {
if ch.isFullySubmitted() && // This implies a number of l1 confirmations has passed, depending on how the txmgr was configured
!ch.isTimedOut() &&
syncStatus.CurrentL1.Number > ch.maxInclusionBlock &&
syncStatus.SafeL2.Number < ch.LatestL2().Number {
return errors.New("safe head did not make expected progress")
}
}
return nil
}
54 changes: 54 additions & 0 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,57 @@ func TestChannelManager_ChannelOutFactory(t *testing.T) {

require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co)
}

func TestChannelManager_CheckExpectedProgress(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
cfg.InitNoneCompressor()
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)

channelMaxInclusionBlockNumber := uint64(3)
channelLatestSafeBlockNumber := uint64(11)

// Prepare a (dummy) fully submitted channel
// with
// maxInclusionBlock and latest safe block number as above
A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0)
require.NoError(t, err)
rng := rand.New(rand.NewSource(123))
a0 := derivetest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID)
a0 = a0.WithSeal(&types.Header{Number: big.NewInt(int64(channelLatestSafeBlockNumber))})
_, err = A.AddBlock(a0)
require.NoError(t, err)
A.maxInclusionBlock = channelMaxInclusionBlockNumber
A.Close()
A.channelBuilder.frames = nil
A.channelBuilder.frameCursor = 0
require.True(t, A.isFullySubmitted())

m.channelQueue = append(m.channelQueue, A)

// The current L1 number implies that
// channel A above should have been derived
// from, so we expect safe head to progress to
// the channelLatestSafeBlockNumber.
// Since the safe head moved to 11, there is no error:
ss := eth.SyncStatus{
CurrentL1: eth.L1BlockRef{Number: channelMaxInclusionBlockNumber + 1},
SafeL2: eth.L2BlockRef{Number: channelLatestSafeBlockNumber},
}
err = m.CheckExpectedProgress(ss)
require.NoError(t, err)

// If the currentL1 is as above but the
// safe head is less than channelLatestSafeBlockNumber,
// the method should return an error:
ss.SafeL2 = eth.L2BlockRef{Number: channelLatestSafeBlockNumber - 1}
err = m.CheckExpectedProgress(ss)
require.Error(t, err)

// If the safe head is still less than channelLatestSafeBlockNumber
// but the currentL1 is _equal_ to the channelMaxInclusionBlockNumber
// there should be no error as that block is still being derived from:
ss.CurrentL1 = eth.L1BlockRef{Number: channelMaxInclusionBlockNumber}
err = m.CheckExpectedProgress(ss)
require.NoError(t, err)
}
30 changes: 22 additions & 8 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,17 +471,20 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR

l.state.pruneSafeBlocks(syncStatus.SafeL2)
l.state.pruneChannels(syncStatus.SafeL2)

err = l.state.CheckExpectedProgress(*syncStatus)
if err != nil {
l.Log.Warn("error checking expected progress, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
continue
}

if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) {
// Wait for any in flight transactions
// to be ingested by the node before
// we start loading blocks again.
err := l.waitNodeSync()
if err != nil {
l.Log.Warn("error waiting for node sync", "err", err)
}
l.clearState(l.shutdownCtx)
l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
continue
}

l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)
case <-ctx.Done():
l.Log.Warn("main loop returning")
Expand Down Expand Up @@ -579,6 +582,17 @@ func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
}
}

func (l *BatchSubmitter) waitNodeSyncAndClearState() {
// Wait for any in flight transactions
// to be ingested by the node before
// we start loading blocks again.
err := l.waitNodeSync()
if err != nil {
l.Log.Warn("error waiting for node sync", "err", err)
}
l.clearState(l.shutdownCtx)
}

// waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error {
Expand Down
2 changes: 1 addition & 1 deletion op-service/eth/sync_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package eth
type SyncStatus struct {
// CurrentL1 is the L1 block that the derivation process is last idled at.
// This may not be fully derived into L2 data yet.
// The safe L2 blocks were produced/included fully from the L1 chain up to and including this L1 block.
// The safe L2 blocks were produced/included fully from the L1 chain up to _but excluding_ this L1 block.
// If the node is synced, this matches the HeadL1, minus the verifier confirmation distance.
CurrentL1 L1BlockRef `json:"current_l1"`
// CurrentL1Finalized is a legacy sync-status attribute. This is deprecated.
Expand Down

0 comments on commit 910c9ad

Please sign in to comment.