Skip to content

Commit

Permalink
feat: toggle settlement health event upon settlement batch submission…
Browse files Browse the repository at this point in the history
… status (#332)
  • Loading branch information
omritoptix authored Jun 4, 2023
1 parent bbd9f01 commit 7c65250
Show file tree
Hide file tree
Showing 17 changed files with 315 additions and 282 deletions.
43 changes: 3 additions & 40 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ type Manager struct {

syncTargetDiode diodes.Diode

batchInProcess atomic.Value
batchRetryCtx context.Context
batchRetryCancel context.CancelFunc
batchRetryMu sync.RWMutex
batchInProcess atomic.Value

shouldProduceBlocksCh chan bool

Expand Down Expand Up @@ -369,7 +366,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {
// TODO(omritoptix): Once we have leader election, we can add a condition.
// Update batch accepted is only relevant for the aggregator
// TODO(omritoptix): Check if we are the aggregator
m.updateBatchAccepted()
m.batchInProcess.Store(false)
case <-subscription.Cancelled():
m.logger.Info("Subscription canceled")
}
Expand All @@ -383,15 +380,6 @@ func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) {
m.syncTargetDiode.Set(diodes.GenericDataType(&endHeight))
}

func (m *Manager) updateBatchAccepted() {
m.batchRetryMu.Lock()
if m.batchRetryCtx != nil && m.batchRetryCtx.Err() == nil {
m.batchRetryCancel()
}
m.batchRetryMu.Unlock()
m.batchInProcess.Store(false)
}

// RetriveLoop listens for new sync messages written to a ring buffer and in turn
// runs syncUntilTarget on the latest message in the ring buffer.
func (m *Manager) RetriveLoop(ctx context.Context) {
Expand Down Expand Up @@ -720,7 +708,7 @@ func (m *Manager) submitNextBatch(ctx context.Context) {
// Submit batch to SL
// TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync with the latestHeight in the SL.
// In that case we'll want to update the syncTarget before returning.
m.submitBatchToSL(nextBatch, resultSubmitToDA)
go m.settlementClient.SubmitBatch(nextBatch, m.dalc.GetClientType(), resultSubmitToDA)
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
Expand Down Expand Up @@ -778,31 +766,6 @@ func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*type
return batch, nil
}

func (m *Manager) submitBatchToSL(batch *types.Batch, resultSubmitToDA *da.ResultSubmitBatch) {
var resultSubmitToSL *settlement.ResultSubmitBatch
m.batchRetryMu.Lock()
m.batchRetryCtx, m.batchRetryCancel = context.WithCancel(context.Background())
m.batchRetryMu.Unlock()
defer m.batchRetryCancel()
// Submit batch to SL
err := retry.Do(func() error {
resultSubmitToSL = m.settlementClient.SubmitBatch(batch, m.dalc.GetClientType(), resultSubmitToDA)
if resultSubmitToSL.Code != settlement.StatusSuccess {
m.logger.Error("failed to submit batch to SL layer", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "error", resultSubmitToSL.Message)
err := fmt.Errorf("failed to submit batch to SL layer: %s", resultSubmitToSL.Message)
return err
}
return nil
}, retry.Context(m.batchRetryCtx), retry.LastErrorOnly(true), retry.Delay(SLBatchRetryDelay), retry.MaxDelay(maxDelay))
// Panic if we failed not due to context cancellation
m.batchRetryMu.Lock()
if err != nil && m.batchRetryCtx.Err() == nil {
m.logger.Error("Failed to submit batch to SL Layer", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "error", err)
panic(err)
}
m.batchRetryMu.Unlock()
}

func (m *Manager) submitBatchToDA(ctx context.Context, batch *types.Batch) (*da.ResultSubmitBatch, error) {
var res da.ResultSubmitBatch
err := retry.Do(func() error {
Expand Down
75 changes: 1 addition & 74 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err)
daResultSubmitBatch := manager.dalc.SubmitBatch(batch)
assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess)
resultSubmitBatch := manager.settlementClient.SubmitBatch(batch, manager.dalc.GetClientType(), &daResultSubmitBatch)
assert.Equal(t, resultSubmitBatch.Code, settlement.StatusSuccess)
manager.settlementClient.SubmitBatch(batch, manager.dalc.GetClientType(), &daResultSubmitBatch)
nextBatchStartHeight = batch.EndHeight + 1
// Wait until daHeight is updated
time.Sleep(time.Millisecond * 500)
Expand Down Expand Up @@ -174,31 +173,6 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}
}

func TestPublishWhenSettlementLayerDisconnected(t *testing.T) {
SLBatchRetryDelay = 1 * time.Second

isSettlementError := atomic.Value{}
isSettlementError.Store(true)
manager, err := getManager(getManagerConfig(), &testutil.SettlementLayerClientSubmitBatchError{IsError: isSettlementError}, nil, 1, 1, 0, nil, nil)
retry.DefaultAttempts = 2
require.NoError(t, err)
require.NotNil(t, manager)

nextBatchStartHeight := atomic.LoadUint64(&manager.syncTarget) + 1
batch, err := testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(defaultBatchSize-1), manager.proposerKey)
assert.NoError(t, err)
daResultSubmitBatch := manager.dalc.SubmitBatch(batch)
assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess)
resultSubmitBatch := manager.settlementClient.SubmitBatch(batch, manager.dalc.GetClientType(), &daResultSubmitBatch)
assert.Equal(t, resultSubmitBatch.Code, settlement.StatusError)

defer func() {
err := recover().(error)
assert.ErrorContains(t, err, connectionRefusedErrorMessage)
}()
manager.submitBatchToSL(&types.Batch{StartHeight: 1, EndHeight: 1}, nil)
}

func TestPublishWhenDALayerDisconnected(t *testing.T) {
DABatchRetryDelay = 1 * time.Second
manager, err := getManager(getManagerConfig(), nil, &testutil.DALayerClientSubmitBatchError{}, 1, 1, 0, nil, nil)
Expand Down Expand Up @@ -446,53 +420,6 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
}
}

// Test batch retry halts upon new batch acceptance
// 1. Produce blocks with settlement layer batch submission error
// 2. Emit an event that a new batch was accepted
// 3. Validate new batches was submitted
func TestBatchRetryWhileBatchAccepted(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
app := testutil.GetAppMock()
// Create proxy app
clientCreator := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(clientCreator)
err := proxyApp.Start()
require.NoError(err)
// Init manager
managerConfig := getManagerConfig()
managerConfig.BlockBatchSize = 1
isSettlementError := atomic.Value{}
isSettlementError.Store(true)
settlementLayerClient := &testutil.SettlementLayerClientSubmitBatchError{IsError: isSettlementError}
manager, err := getManager(managerConfig, settlementLayerClient, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
// Produce blocks with settlement layer batch submission error
blockLoopContext, blockLoopCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer blockLoopCancel()
defer cancel()
go manager.ProduceBlockLoop(blockLoopContext)
go manager.SyncTargetLoop(ctx)
time.Sleep(200 * time.Millisecond)
assert.Equal(uint64(0), atomic.LoadUint64(&settlementLayerClient.BatchCounter))
// Cancel block production to not interfere with the isBatchInProcess flag
blockLoopCancel()
time.Sleep(100 * time.Millisecond)
// Emit an event that a new batch was accepted and wait for it to be processed
eventData := &settlement.EventDataNewSettlementBatchAccepted{EndHeight: 1, StateIndex: 1}
manager.pubsub.PublishWithEvents(ctx, eventData, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}})
time.Sleep(200 * time.Millisecond)
// Change settlement layer to accept batches and validate new batches was submitted
settlementLayerClient.IsError.Store(false)
blockLoopContext, blockLoopCancel = context.WithCancel(context.Background())
defer blockLoopCancel()
go manager.ProduceBlockLoop(blockLoopContext)
time.Sleep(1 * time.Second)
assert.Greater(atomic.LoadUint64(&settlementLayerClient.BatchCounter), uint64(0))

}

func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
Expand Down
22 changes: 2 additions & 20 deletions mocks/settlement/hubclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestAggregatorMode(t *testing.T) {
anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)

blockManagerConfig := config.BlockManagerConfig{
BlockBatchSize: 1,
BlockTime: 1 * time.Second,
NamespaceID: "0102030405060708",
BatchSyncInterval: time.Second * 5,
Expand Down
4 changes: 2 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestMempoolDirectly(t *testing.T) {
P2P: config.P2PConfig{},
RPC: config.RPCConfig{},
Aggregator: false,
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second * 5, BlockTime: 100 * time.Millisecond},
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second * 5, BlockTime: 100 * time.Millisecond, BlockBatchSize: 2},
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestHealthStatusEventHandler(t *testing.T) {
assert.Equal(c.expectedError, healthStatusEvent.Error)
done <- true
break
case <-time.After(100 * time.Millisecond):
case <-time.After(500 * time.Millisecond):
if c.expectHealthStatusEventEmitted {
t.Error("expected health status event but didn't get one")
}
Expand Down
2 changes: 1 addition & 1 deletion node/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func CreateNode(isAggregator bool, blockManagerConfig *config.BlockManagerConfig
nodeConfig := config.DefaultNodeConfig

if blockManagerConfig == nil {
blockManagerConfig = &config.BlockManagerConfig{BatchSyncInterval: time.Second * 5, BlockTime: 100 * time.Millisecond}
blockManagerConfig = &config.BlockManagerConfig{BlockBatchSize: 1, BatchSyncInterval: time.Second * 5, BlockTime: 100 * time.Millisecond}
}
nodeConfig.BlockManagerConfig = *blockManagerConfig
nodeConfig.Aggregator = isAggregator
Expand Down
9 changes: 6 additions & 3 deletions rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestGenesisChunked(t *testing.T) {
P2P: config.P2PConfig{},
RPC: config.RPCConfig{},
Aggregator: false,
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second, BlockTime: 100 * time.Millisecond},
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second, BlockTime: 100 * time.Millisecond, BlockBatchSize: 1},
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -429,6 +429,7 @@ func TestTx(t *testing.T) {
SettlementLayer: "mock",
Aggregator: true,
BlockManagerConfig: config.BlockManagerConfig{
BlockBatchSize: 1,
BlockTime: 200 * time.Millisecond,
BatchSyncInterval: time.Second,
},
Expand Down Expand Up @@ -693,7 +694,7 @@ func TestValidatorSetHandling(t *testing.T) {
DALayer: "mock",
SettlementLayer: "mock",
Aggregator: true,
BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond, BatchSyncInterval: time.Second},
BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond, BatchSyncInterval: time.Second, BlockBatchSize: 1},
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes)},
}

Expand Down Expand Up @@ -804,7 +805,7 @@ func getRPC(t *testing.T) (*mocks.Application, *Client) {
P2P: config.P2PConfig{},
RPC: config.RPCConfig{},
Aggregator: false,
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second, BlockTime: 100 * time.Millisecond},
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second, BlockTime: 100 * time.Millisecond, BlockBatchSize: 1},
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -886,6 +887,7 @@ func TestMempool2Nodes(t *testing.T) {
SettlementLayer: "mock",
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKey1Bytes)},
BlockManagerConfig: config.BlockManagerConfig{
BlockBatchSize: 1,
BlockTime: 100 * time.Millisecond,
BatchSyncInterval: time.Second,
},
Expand All @@ -901,6 +903,7 @@ func TestMempool2Nodes(t *testing.T) {
SettlementLayer: "mock",
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKey2Bytes)},
BlockManagerConfig: config.BlockManagerConfig{
BlockBatchSize: 1,
BlockTime: 100 * time.Millisecond,
BatchSyncInterval: time.Second,
},
Expand Down
2 changes: 1 addition & 1 deletion rpc/json/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func getRPC(t *testing.T) (*mocks.Application, *client.Client) {
signingKey, proposerPubKey, _ := crypto.GenerateEd25519Key(rand.Reader)
proposerPubKeyBytes, err := proposerPubKey.Raw()
require.NoError(err)
config := config.NodeConfig{Aggregator: true, DALayer: "mock", SettlementLayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second, BatchSyncInterval: time.Second}, SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes)}}
config := config.NodeConfig{Aggregator: true, DALayer: "mock", SettlementLayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second, BatchSyncInterval: time.Second, BlockBatchSize: 1}, SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes)}}
node, err := node.NewNode(context.Background(), config, key, signingKey, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger())
require.NoError(err)
require.NotNil(node)
Expand Down
27 changes: 7 additions & 20 deletions settlement/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package settlement

import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/dymensionxyz/dymint/da"
Expand Down Expand Up @@ -93,27 +93,14 @@ func (b *BaseLayerClient) Stop() error {
return nil
}

// SubmitBatch submits the batch to the settlement layer. This should create a transaction which (potentially)
// triggers a state transition in the settlement layer.
func (b *BaseLayerClient) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) *ResultSubmitBatch {
// SubmitBatch tries submitting the batch in an async broadcast mode to the settlement layer. Events are emitted on success or failure.
func (b *BaseLayerClient) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) {
b.logger.Debug("Submitting batch to settlement layer", "start height", batch.StartHeight, "end height", batch.EndHeight)
err := b.validateBatch(batch)
if err != nil {
return &ResultSubmitBatch{
BaseResult: BaseResult{Code: StatusError, Message: err.Error()},
}
}
txResp, err := b.client.PostBatch(batch, daClient, daResult)
if err != nil || txResp.GetCode() != 0 {
b.logger.Error("Error sending batch to settlement layer", "error", err)
return &ResultSubmitBatch{
BaseResult: BaseResult{Code: StatusError, Message: err.Error()},
}
}
b.logger.Info("Successfully submitted batch to settlement layer", "tx hash", txResp.GetTxHash())
return &ResultSubmitBatch{
BaseResult: BaseResult{Code: StatusSuccess},
panic(err)
}
b.client.PostBatch(batch, daClient, daResult)
}

// RetrieveBatch Gets the batch which contains the given slHeight. Empty slHeight returns the latest batch.
Expand Down Expand Up @@ -161,10 +148,10 @@ func (b *BaseLayerClient) fetchSequencersList() ([]*types.Sequencer, error) {

func (b *BaseLayerClient) validateBatch(batch *types.Batch) error {
if batch.StartHeight != atomic.LoadUint64(&b.latestHeight)+1 {
return errors.New("batch start height must be last height + 1")
return fmt.Errorf("batch start height must be last height. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)+1)
}
if batch.EndHeight < batch.StartHeight {
return errors.New("batch end height must be greater or equal to start height")
return fmt.Errorf("batch end height must be greater than start height. EndHeight %d, StartHeight %d", batch.EndHeight, batch.StartHeight)
}
return nil
}
Expand Down
Loading

0 comments on commit 7c65250

Please sign in to comment.