Skip to content

Commit

Permalink
hoist pending tx management up
Browse files Browse the repository at this point in the history
  • Loading branch information
geoknee committed Sep 23, 2024
1 parent 81d693e commit dbd86ca
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 3 deletions.
1 change: 0 additions & 1 deletion op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func (s *channel) NextTxData() txData {

id := txdata.ID().String()
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
s.pendingTransactions[id] = txdata

return txdata
}
Expand Down
11 changes: 9 additions & 2 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,12 @@ func (s *channelManager) removePendingChannel(channel *channel) {
}

// nextTxData dequeues frames from the channel and returns them encoded in a transaction.
// It also handles updating the internal state of the receiver.
func (s *channelManager) nextTxData(channel *channel) (txData, error) {
if channel == nil || !channel.HasTxData() {
s.log.Trace("no next tx data")
return txData{}, io.EOF // TODO: not enough data error instead
}
tx := channel.NextTxData()
s.txChannels[tx.ID().String()] = channel
return tx, nil
}

Expand All @@ -158,6 +156,15 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
// automatically. When switching DA type, the channelManager state will be rebuilt
// with a new ChannelConfig.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {

var data txData
var channel *channel
defer func() {
if len(data.frames) > 0 && channel != nil {
s.txChannels[data.ID().String()] = channel
}
}()

channel, data, err := s.txData(l1Head)
if err != nil {
return data, err
Expand Down
126 changes: 126 additions & 0 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package batcher

import (
"errors"
"io"
"math/big"
"math/rand"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -484,6 +486,130 @@ func TestChannelManager_ChannelCreation(t *testing.T) {
}
}

type MockChannelManager struct {
mock.Mock
*channelManager
}

func (m *MockChannelManager) Requeue(cfg ChannelConfig) error {
args := m.Called(cfg)
return args.Error(0)
}

type FakeDynamicEthChannelConfig struct {
DynamicEthChannelConfig
chooseBlobs bool
}

func (f *FakeDynamicEthChannelConfig) ChannelConfig() ChannelConfig {
if f.chooseBlobs {
return f.blobConfig
}
return f.calldataConfig
}

func newFakeDynamicEthChannelConfig(lgr log.Logger,
reqTimeout time.Duration) *FakeDynamicEthChannelConfig {

calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
}
calldataCfg.InitRatioCompressor(1, derive.Brotli)
blobCfg.InitRatioCompressor(1, derive.Brotli)

return &FakeDynamicEthChannelConfig{
chooseBlobs: false,
DynamicEthChannelConfig: *NewDynamicEthChannelConfig(lgr, reqTimeout, &mockGasPricer{}, blobCfg, calldataCfg),
}
}

func TestChannelManager_TxData(t *testing.T) {

type TestCase struct {
name string
chooseBlobsWhenChannelCreated bool
chooseBlobsWhenChannelSubmitted bool
}

tt := []TestCase{
// {"blobs->blobs", true, true},
// {"calldata->calldata", false, false},
{"blobs->calldata", true, false},
// {"calldata->blobs", false, true}
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)

cfg := newFakeDynamicEthChannelConfig(l, 1000)

cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated
n := NewChannelManager(l, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
m := MockChannelManager{channelManager: n}
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs)
m.On("Requeue", mock.Anything).Return(nil)

// Seed channel manager with blocks
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
blockA := derivetest.RandomL2BlockWithChainId(rng, 100, defaultTestRollupConfig.L2ChainID)
m.blocks = []*types.Block{blockA}

// Call TxData a first time to trigger blocks->channels pipeline
_, err := m.TxData(eth.BlockID{})
// We allow for an EOF error here, because there is not
// enough data to send yet
if !errors.Is(err, io.EOF) {
require.NoError(t, err)
}
// The test requires us to have something in the channel queue
// at this point
require.NotEmpty(t, m.channelQueue)

// Simulate updated market conditions
// by possibly flipping the state of the
// fake channel provider
cfg.chooseBlobs = tc.chooseBlobsWhenChannelSubmitted

// Add a block and call TxData until
// we get some data to submit
for {
m.blocks = []*types.Block{blockA}
// Call TxData a sceond time to to handle requeuing any channels
// which started to fill but were not sent yet
data, err := m.TxData(eth.BlockID{})
t.Log(len(data.frames), err)
t.Log(m.channelQueue[0].channelBuilder.co.InputBytes())
if err == nil {
break
}
if !errors.Is(err, io.EOF) { // We allow for an EOF error here
t.Fatal(err)
}

}

// TODO here we want to ensure there is a channel with data inside of it
// but which has not yet been sent
require.True(t, m.channelQueue[0].IsFull())

if tc.chooseBlobsWhenChannelCreated != tc.chooseBlobsWhenChannelSubmitted {
m.AssertCalled(t, "Requeue", cfg)
} else {
m.AssertNotCalled(t, "Requeue", cfg)
}
// TODO assert defaultCfg updated correctly
})
}

}

func TestChannelManager_Requeue(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
Expand Down

0 comments on commit dbd86ca

Please sign in to comment.