Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: toggle settlement health event upon settlement batch submission status #332

Merged
merged 6 commits into from
Jun 4, 2023
43 changes: 3 additions & 40 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ type Manager struct {

syncTargetDiode diodes.Diode

batchInProcess atomic.Value
batchRetryCtx context.Context
batchRetryCancel context.CancelFunc
batchRetryMu sync.RWMutex
batchInProcess atomic.Value

shouldProduceBlocksCh chan bool

Expand Down Expand Up @@ -369,7 +366,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {
// TODO(omritoptix): Once we have leader election, we can add a condition.
// Update batch accepted is only relevant for the aggregator
// TODO(omritoptix): Check if we are the aggregator
m.updateBatchAccepted()
m.batchInProcess.Store(false)
case <-subscription.Cancelled():
m.logger.Info("Subscription canceled")
}
Expand All @@ -383,15 +380,6 @@ func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) {
m.syncTargetDiode.Set(diodes.GenericDataType(&endHeight))
}

func (m *Manager) updateBatchAccepted() {
m.batchRetryMu.Lock()
if m.batchRetryCtx != nil && m.batchRetryCtx.Err() == nil {
m.batchRetryCancel()
}
m.batchRetryMu.Unlock()
m.batchInProcess.Store(false)
}

// RetriveLoop listens for new sync messages written to a ring buffer and in turn
// runs syncUntilTarget on the latest message in the ring buffer.
func (m *Manager) RetriveLoop(ctx context.Context) {
Expand Down Expand Up @@ -720,7 +708,7 @@ func (m *Manager) submitNextBatch(ctx context.Context) {
// Submit batch to SL
// TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync with the latestHeight in the SL.
// In that case we'll want to update the syncTarget before returning.
m.submitBatchToSL(nextBatch, resultSubmitToDA)
go m.settlementClient.SubmitBatch(nextBatch, m.dalc.GetClientType(), resultSubmitToDA)
omritoptix marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
Expand Down Expand Up @@ -778,31 +766,6 @@ func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*type
return batch, nil
}

func (m *Manager) submitBatchToSL(batch *types.Batch, resultSubmitToDA *da.ResultSubmitBatch) {
var resultSubmitToSL *settlement.ResultSubmitBatch
m.batchRetryMu.Lock()
m.batchRetryCtx, m.batchRetryCancel = context.WithCancel(context.Background())
m.batchRetryMu.Unlock()
defer m.batchRetryCancel()
// Submit batch to SL
err := retry.Do(func() error {
resultSubmitToSL = m.settlementClient.SubmitBatch(batch, m.dalc.GetClientType(), resultSubmitToDA)
if resultSubmitToSL.Code != settlement.StatusSuccess {
m.logger.Error("failed to submit batch to SL layer", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "error", resultSubmitToSL.Message)
err := fmt.Errorf("failed to submit batch to SL layer: %s", resultSubmitToSL.Message)
return err
}
return nil
}, retry.Context(m.batchRetryCtx), retry.LastErrorOnly(true), retry.Delay(SLBatchRetryDelay), retry.MaxDelay(maxDelay))
// Panic if we failed not due to context cancellation
m.batchRetryMu.Lock()
if err != nil && m.batchRetryCtx.Err() == nil {
m.logger.Error("Failed to submit batch to SL Layer", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "error", err)
panic(err)
}
m.batchRetryMu.Unlock()
}

func (m *Manager) submitBatchToDA(ctx context.Context, batch *types.Batch) (*da.ResultSubmitBatch, error) {
var res da.ResultSubmitBatch
err := retry.Do(func() error {
Expand Down
75 changes: 1 addition & 74 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err)
daResultSubmitBatch := manager.dalc.SubmitBatch(batch)
assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess)
resultSubmitBatch := manager.settlementClient.SubmitBatch(batch, manager.dalc.GetClientType(), &daResultSubmitBatch)
assert.Equal(t, resultSubmitBatch.Code, settlement.StatusSuccess)
manager.settlementClient.SubmitBatch(batch, manager.dalc.GetClientType(), &daResultSubmitBatch)
nextBatchStartHeight = batch.EndHeight + 1
// Wait until daHeight is updated
time.Sleep(time.Millisecond * 500)
Expand Down Expand Up @@ -174,31 +173,6 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}
}

func TestPublishWhenSettlementLayerDisconnected(t *testing.T) {
SLBatchRetryDelay = 1 * time.Second

isSettlementError := atomic.Value{}
isSettlementError.Store(true)
manager, err := getManager(getManagerConfig(), &testutil.SettlementLayerClientSubmitBatchError{IsError: isSettlementError}, nil, 1, 1, 0, nil, nil)
retry.DefaultAttempts = 2
require.NoError(t, err)
require.NotNil(t, manager)

nextBatchStartHeight := atomic.LoadUint64(&manager.syncTarget) + 1
batch, err := testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(defaultBatchSize-1), manager.proposerKey)
assert.NoError(t, err)
daResultSubmitBatch := manager.dalc.SubmitBatch(batch)
assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess)
resultSubmitBatch := manager.settlementClient.SubmitBatch(batch, manager.dalc.GetClientType(), &daResultSubmitBatch)
assert.Equal(t, resultSubmitBatch.Code, settlement.StatusError)

defer func() {
err := recover().(error)
assert.ErrorContains(t, err, connectionRefusedErrorMessage)
}()
manager.submitBatchToSL(&types.Batch{StartHeight: 1, EndHeight: 1}, nil)
}

func TestPublishWhenDALayerDisconnected(t *testing.T) {
DABatchRetryDelay = 1 * time.Second
manager, err := getManager(getManagerConfig(), nil, &testutil.DALayerClientSubmitBatchError{}, 1, 1, 0, nil, nil)
Expand Down Expand Up @@ -446,53 +420,6 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
}
}

// Test batch retry halts upon new batch acceptance
// 1. Produce blocks with settlement layer batch submission error
// 2. Emit an event that a new batch was accepted
// 3. Validate new batches was submitted
func TestBatchRetryWhileBatchAccepted(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
app := testutil.GetAppMock()
// Create proxy app
clientCreator := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(clientCreator)
err := proxyApp.Start()
require.NoError(err)
// Init manager
managerConfig := getManagerConfig()
managerConfig.BlockBatchSize = 1
isSettlementError := atomic.Value{}
isSettlementError.Store(true)
settlementLayerClient := &testutil.SettlementLayerClientSubmitBatchError{IsError: isSettlementError}
manager, err := getManager(managerConfig, settlementLayerClient, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
// Produce blocks with settlement layer batch submission error
blockLoopContext, blockLoopCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer blockLoopCancel()
defer cancel()
go manager.ProduceBlockLoop(blockLoopContext)
go manager.SyncTargetLoop(ctx)
time.Sleep(200 * time.Millisecond)
assert.Equal(uint64(0), atomic.LoadUint64(&settlementLayerClient.BatchCounter))
// Cancel block production to not interfere with the isBatchInProcess flag
blockLoopCancel()
time.Sleep(100 * time.Millisecond)
// Emit an event that a new batch was accepted and wait for it to be processed
eventData := &settlement.EventDataNewSettlementBatchAccepted{EndHeight: 1, StateIndex: 1}
manager.pubsub.PublishWithEvents(ctx, eventData, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}})
time.Sleep(200 * time.Millisecond)
// Change settlement layer to accept batches and validate new batches was submitted
settlementLayerClient.IsError.Store(false)
blockLoopContext, blockLoopCancel = context.WithCancel(context.Background())
defer blockLoopCancel()
go manager.ProduceBlockLoop(blockLoopContext)
time.Sleep(1 * time.Second)
assert.Greater(atomic.LoadUint64(&settlementLayerClient.BatchCounter), uint64(0))

}

func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
Expand Down
22 changes: 2 additions & 20 deletions mocks/settlement/hubclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestAggregatorMode(t *testing.T) {
anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)

blockManagerConfig := config.BlockManagerConfig{
BlockBatchSize: 1,
BlockTime: 1 * time.Second,
NamespaceID: "0102030405060708",
BatchSyncInterval: time.Second * 5,
Expand Down
4 changes: 2 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestMempoolDirectly(t *testing.T) {
P2P: config.P2PConfig{},
RPC: config.RPCConfig{},
Aggregator: false,
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second * 5, BlockTime: 100 * time.Millisecond},
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second * 5, BlockTime: 100 * time.Millisecond, BlockBatchSize: 2},
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestHealthStatusEventHandler(t *testing.T) {
assert.Equal(c.expectedError, healthStatusEvent.Error)
done <- true
break
case <-time.After(100 * time.Millisecond):
case <-time.After(500 * time.Millisecond):
if c.expectHealthStatusEventEmitted {
t.Error("expected health status event but didn't get one")
}
Expand Down
2 changes: 1 addition & 1 deletion node/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func CreateNode(isAggregator bool, blockManagerConfig *config.BlockManagerConfig
nodeConfig := config.DefaultNodeConfig

if blockManagerConfig == nil {
blockManagerConfig = &config.BlockManagerConfig{BatchSyncInterval: time.Second * 5, BlockTime: 100 * time.Millisecond}
blockManagerConfig = &config.BlockManagerConfig{BlockBatchSize: 1, BatchSyncInterval: time.Second * 5, BlockTime: 100 * time.Millisecond}
}
nodeConfig.BlockManagerConfig = *blockManagerConfig
nodeConfig.Aggregator = isAggregator
Expand Down
9 changes: 6 additions & 3 deletions rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestGenesisChunked(t *testing.T) {
P2P: config.P2PConfig{},
RPC: config.RPCConfig{},
Aggregator: false,
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second, BlockTime: 100 * time.Millisecond},
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second, BlockTime: 100 * time.Millisecond, BlockBatchSize: 1},
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -429,6 +429,7 @@ func TestTx(t *testing.T) {
SettlementLayer: "mock",
Aggregator: true,
BlockManagerConfig: config.BlockManagerConfig{
BlockBatchSize: 1,
BlockTime: 200 * time.Millisecond,
BatchSyncInterval: time.Second,
},
Expand Down Expand Up @@ -693,7 +694,7 @@ func TestValidatorSetHandling(t *testing.T) {
DALayer: "mock",
SettlementLayer: "mock",
Aggregator: true,
BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond, BatchSyncInterval: time.Second},
BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond, BatchSyncInterval: time.Second, BlockBatchSize: 1},
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes)},
}

Expand Down Expand Up @@ -804,7 +805,7 @@ func getRPC(t *testing.T) (*mocks.Application, *Client) {
P2P: config.P2PConfig{},
RPC: config.RPCConfig{},
Aggregator: false,
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second, BlockTime: 100 * time.Millisecond},
BlockManagerConfig: config.BlockManagerConfig{BatchSyncInterval: time.Second, BlockTime: 100 * time.Millisecond, BlockBatchSize: 1},
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -886,6 +887,7 @@ func TestMempool2Nodes(t *testing.T) {
SettlementLayer: "mock",
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKey1Bytes)},
BlockManagerConfig: config.BlockManagerConfig{
BlockBatchSize: 1,
BlockTime: 100 * time.Millisecond,
BatchSyncInterval: time.Second,
},
Expand All @@ -901,6 +903,7 @@ func TestMempool2Nodes(t *testing.T) {
SettlementLayer: "mock",
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKey2Bytes)},
BlockManagerConfig: config.BlockManagerConfig{
BlockBatchSize: 1,
BlockTime: 100 * time.Millisecond,
BatchSyncInterval: time.Second,
},
Expand Down
2 changes: 1 addition & 1 deletion rpc/json/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func getRPC(t *testing.T) (*mocks.Application, *client.Client) {
signingKey, proposerPubKey, _ := crypto.GenerateEd25519Key(rand.Reader)
proposerPubKeyBytes, err := proposerPubKey.Raw()
require.NoError(err)
config := config.NodeConfig{Aggregator: true, DALayer: "mock", SettlementLayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second, BatchSyncInterval: time.Second}, SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes)}}
config := config.NodeConfig{Aggregator: true, DALayer: "mock", SettlementLayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second, BatchSyncInterval: time.Second, BlockBatchSize: 1}, SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes)}}
node, err := node.NewNode(context.Background(), config, key, signingKey, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger())
require.NoError(err)
require.NotNil(node)
Expand Down
33 changes: 8 additions & 25 deletions settlement/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package settlement

import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/dymensionxyz/dymint/da"
Expand Down Expand Up @@ -93,27 +93,11 @@ func (b *BaseLayerClient) Stop() error {
return nil
}

// SubmitBatch submits the batch to the settlement layer. This should create a transaction which (potentially)
// triggers a state transition in the settlement layer.
func (b *BaseLayerClient) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) *ResultSubmitBatch {
// SubmitBatch tries submitting the batch in an async way to the settlement layer. Events are emitted on success or failure.
omritoptix marked this conversation as resolved.
Show resolved Hide resolved
func (b *BaseLayerClient) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) {
b.logger.Debug("Submitting batch to settlement layer", "start height", batch.StartHeight, "end height", batch.EndHeight)
err := b.validateBatch(batch)
if err != nil {
return &ResultSubmitBatch{
BaseResult: BaseResult{Code: StatusError, Message: err.Error()},
}
}
txResp, err := b.client.PostBatch(batch, daClient, daResult)
if err != nil || txResp.GetCode() != 0 {
b.logger.Error("Error sending batch to settlement layer", "error", err)
return &ResultSubmitBatch{
BaseResult: BaseResult{Code: StatusError, Message: err.Error()},
}
}
b.logger.Info("Successfully submitted batch to settlement layer", "tx hash", txResp.GetTxHash())
return &ResultSubmitBatch{
BaseResult: BaseResult{Code: StatusSuccess},
}
b.validateBatch(batch)
b.client.PostBatch(batch, daClient, daResult)
}

// RetrieveBatch Gets the batch which contains the given slHeight. Empty slHeight returns the latest batch.
Expand Down Expand Up @@ -159,14 +143,13 @@ func (b *BaseLayerClient) fetchSequencersList() ([]*types.Sequencer, error) {
return sequencers, nil
}

func (b *BaseLayerClient) validateBatch(batch *types.Batch) error {
func (b *BaseLayerClient) validateBatch(batch *types.Batch) {
omritoptix marked this conversation as resolved.
Show resolved Hide resolved
if batch.StartHeight != atomic.LoadUint64(&b.latestHeight)+1 {
return errors.New("batch start height must be last height + 1")
panic(fmt.Sprintf("batch start height must be last height. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)+1))
}
if batch.EndHeight < batch.StartHeight {
return errors.New("batch end height must be greater or equal to start height")
panic(fmt.Sprintf("batch end height must be greater or equal to start height. StartHeight %d, EndHeight %d", batch.StartHeight, batch.EndHeight))
}
return nil
}

func (b *BaseLayerClient) stateUpdatesHandler(ready chan bool) {
Expand Down
Loading