Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retries for hub client #630

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"code.cloudfoundry.org/go-diodes"

"github.com/avast/retry-go/v4"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/utils"
Expand Down Expand Up @@ -279,23 +278,5 @@ func (m *Manager) applyBlockCallback(event pubsub.Message) {

// getLatestBatchFromSL gets the latest batch from the SL
func (m *Manager) getLatestBatchFromSL(ctx context.Context) (*settlement.ResultRetrieveBatch, error) {
var resultRetrieveBatch *settlement.ResultRetrieveBatch
var err error
// Get latest batch from SL
err = retry.Do(
func() error {
resultRetrieveBatch, err = m.settlementClient.RetrieveBatch()
if err != nil {
return err
}
return nil
},
retry.LastErrorOnly(true),
retry.Context(ctx),
retry.Attempts(1),
)
if err != nil {
return resultRetrieveBatch, err
}
return resultRetrieveBatch, nil
return m.settlementClient.RetrieveBatch()
}
Comment on lines 280 to 282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function could be inlined, it seems

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. done in other PR

97 changes: 71 additions & 26 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
"strconv"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/avast/retry-go/v4"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/codec"
Expand All @@ -15,7 +18,6 @@
rollapptypes "github.com/dymensionxyz/dymension/v3/x/rollapp/types"
"github.com/google/uuid"
"github.com/ignite/cli/ignite/pkg/cosmosaccount"
"github.com/pkg/errors"

cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sequencertypes "github.com/dymensionxyz/dymension/v3/x/sequencer/types"
Expand Down Expand Up @@ -185,7 +187,6 @@
}
go d.eventHandler()
return nil

}

// Stop stops the HubClient.
Expand Down Expand Up @@ -286,14 +287,28 @@

// GetLatestBatch returns the latest batch from the Dymension Hub.
func (d *HubClient) GetLatestBatch(rollappID string) (*settlement.ResultRetrieveBatch, error) {
latestStateInfoIndexResp, err := d.rollappQueryClient.LatestStateIndex(d.ctx,
&rollapptypes.QueryGetLatestStateIndexRequest{RollappId: d.config.RollappID})
if latestStateInfoIndexResp == nil {
return nil, settlement.ErrBatchNotFound
}
var latestStateInfoIndexResp *rollapptypes.QueryGetLatestStateIndexResponse

err := d.RunWithRetry(func() error {
var err error
latestStateInfoIndexResp, err = d.rollappQueryClient.LatestStateIndex(d.ctx,
&rollapptypes.QueryGetLatestStateIndexRequest{RollappId: d.config.RollappID})

if status.Code(err) == codes.NotFound {
return retry.Unrecoverable(settlement.ErrBatchNotFound)
}

Check warning on line 299 in settlement/dymension/dymension.go

View check run for this annotation

Codecov / codecov/patch

settlement/dymension/dymension.go#L298-L299

Added lines #L298 - L299 were not covered by tests

return err
})
if err != nil {
return nil, err
}

// not supposed to happen, but just in case
if latestStateInfoIndexResp == nil {
return nil, settlement.ErrEmptyResponse
}

Check warning on line 310 in settlement/dymension/dymension.go

View check run for this annotation

Codecov / codecov/patch

settlement/dymension/dymension.go#L309-L310

Added lines #L309 - L310 were not covered by tests

latestBatch, err := d.GetBatchAtIndex(rollappID, latestStateInfoIndexResp.StateIndex.Index)
if err != nil {
return nil, err
Expand All @@ -303,26 +318,48 @@

// GetBatchAtIndex returns the batch at the given index from the Dymension Hub.
func (d *HubClient) GetBatchAtIndex(rollappID string, index uint64) (*settlement.ResultRetrieveBatch, error) {
stateInfoResp, err := d.rollappQueryClient.StateInfo(d.ctx,
&rollapptypes.QueryGetStateInfoRequest{RollappId: d.config.RollappID, Index: index})
if stateInfoResp == nil {
return nil, settlement.ErrBatchNotFound
}
var stateInfoResp *rollapptypes.QueryGetStateInfoResponse
err := d.RunWithRetry(func() error {
var err error
stateInfoResp, err = d.rollappQueryClient.StateInfo(d.ctx,
&rollapptypes.QueryGetStateInfoRequest{RollappId: d.config.RollappID, Index: index})

if status.Code(err) == codes.NotFound {
return retry.Unrecoverable(settlement.ErrBatchNotFound)
}

Check warning on line 329 in settlement/dymension/dymension.go

View check run for this annotation

Codecov / codecov/patch

settlement/dymension/dymension.go#L328-L329

Added lines #L328 - L329 were not covered by tests

return err
})
if err != nil {
return nil, err
}
// not supposed to happen, but just in case
if stateInfoResp == nil {
return nil, settlement.ErrEmptyResponse
}

Check warning on line 339 in settlement/dymension/dymension.go

View check run for this annotation

Codecov / codecov/patch

settlement/dymension/dymension.go#L338-L339

Added lines #L338 - L339 were not covered by tests

return d.convertStateInfoToResultRetrieveBatch(&stateInfoResp.StateInfo)
}

// GetSequencers returns the bonded sequencers of the given rollapp.
func (d *HubClient) GetSequencers(rollappID string) ([]*types.Sequencer, error) {
var res *sequencertypes.QueryGetSequencersByRollappByStatusResponse
req := &sequencertypes.QueryGetSequencersByRollappByStatusRequest{
RollappId: d.config.RollappID,
Status: sequencertypes.Bonded,
}
res, err := d.sequencerQueryClient.SequencersByRollappByStatus(d.ctx, req)
err := d.RunWithRetry(func() error {
var err error
res, err = d.sequencerQueryClient.SequencersByRollappByStatus(d.ctx, req)
return err
})
if err != nil {
return nil, errors.Wrapf(settlement.ErrNoSequencerForRollapp, "rollappID: %s", rollappID)
return nil, err
}

Check warning on line 358 in settlement/dymension/dymension.go

View check run for this annotation

Codecov / codecov/patch

settlement/dymension/dymension.go#L357-L358

Added lines #L357 - L358 were not covered by tests

// not supposed to happen, but just in case
if res == nil {
return nil, settlement.ErrEmptyResponse

Check warning on line 362 in settlement/dymension/dymension.go

View check run for this annotation

Codecov / codecov/patch

settlement/dymension/dymension.go#L362

Added line #L362 was not covered by tests
}

sequencersList := make([]*types.Sequencer, 0, len(res.Sequencers))
Expand All @@ -347,15 +384,14 @@
}

func (d *HubClient) submitBatch(msgUpdateState *rollapptypes.MsgUpdateState) error {
err := retry.Do(func() error {
err := d.RunWithRetry(func() error {
txResp, err := d.client.BroadcastTx(d.config.DymAccountName, msgUpdateState)
if err != nil || txResp.Code != 0 {
d.logger.Error("Error sending batch to settlement layer", "error", err)
return err
}
return nil
}, retry.Context(d.ctx), retry.LastErrorOnly(true), retry.Delay(d.batchRetryDelay),
retry.MaxDelay(batchRetryMaxDelay), retry.Attempts(d.batchRetryAttempts))
})
return err
}

Expand Down Expand Up @@ -423,7 +459,6 @@
BDs: rollapptypes.BlockDescriptors{BD: blockDescriptors},
}
return settlementBatch, nil

}

func getCosmosClientOptions(config *settlement.Config) []cosmosclient.Option {
Expand Down Expand Up @@ -496,24 +531,34 @@
}
return &settlement.ResultRetrieveBatch{
BaseResult: settlement.BaseResult{Code: settlement.StatusSuccess, StateIndex: stateInfo.StateInfoIndex.Index},
Batch: batchResult}, nil
Batch: batchResult,
}, nil
}

// TODO(omritoptix): Change the retry attempts to be only for the batch polling. Also we need to have a more
// bullet proof check as theoretically the tx can stay in the mempool longer then our retry attempts.
func (d *HubClient) waitForBatchInclusion(batchStartHeight uint64) (*settlement.ResultRetrieveBatch, error) {
var resultRetriveBatch *settlement.ResultRetrieveBatch
err := retry.Do(func() error {
err := d.RunWithRetry(func() error {
latestBatch, err := d.GetLatestBatch(d.config.RollappID)
if err != nil {
return err
}
if latestBatch.Batch.StartHeight == batchStartHeight {
resultRetriveBatch = latestBatch
return nil
if latestBatch.Batch.StartHeight != batchStartHeight {
return settlement.ErrBatchNotFound

Check warning on line 548 in settlement/dymension/dymension.go

View check run for this annotation

Codecov / codecov/patch

settlement/dymension/dymension.go#L548

Added line #L548 was not covered by tests
}
return settlement.ErrBatchNotFound
}, retry.Context(d.ctx), retry.LastErrorOnly(true),
retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts), retry.MaxDelay(batchRetryMaxDelay))
resultRetriveBatch = latestBatch
return nil
})
return resultRetriveBatch, err
}

func (d *HubClient) RunWithRetry(operation func() error) error {
return retry.Do(operation,
retry.Context(d.ctx),
retry.LastErrorOnly(true),
retry.Delay(d.batchRetryDelay),
retry.Attempts(d.batchRetryAttempts),
retry.MaxDelay(batchRetryMaxDelay),
)
}
5 changes: 3 additions & 2 deletions settlement/dymension/dymension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,11 @@ func TestPostBatch(t *testing.T) {
}
rollappQueryClientMock.On("StateInfo", mock.Anything, mock.Anything).Return(
&rollapptypes.QueryGetStateInfoResponse{StateInfo: rollapptypes.StateInfo{
StartHeight: batch.StartHeight, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1}},
StartHeight: batch.StartHeight, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1,
}},
nil)
} else {
rollappQueryClientMock.On("LatestStateIndex", mock.Anything, mock.Anything).Return(nil, nil)
rollappQueryClientMock.On("LatestStateIndex", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("error"))
}
}
hubClient, err := newDymensionHubClient(settlement.Config{}, pubsubServer, log.TestingLogger(), options...)
Expand Down
2 changes: 2 additions & 0 deletions settlement/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import "errors"
var (
// ErrBatchNotFound is returned when a batch is not found for the rollapp.
ErrBatchNotFound = errors.New("batch not found")
// ErrEmptyResponse is returned when the response is empty.
ErrEmptyResponse = errors.New("empty response")
// ErrNoSequencerForRollapp is returned when a sequencer is not found for the rollapp.
ErrNoSequencerForRollapp = errors.New("no sequencer registered on the hub for this rollapp")
// ErrBatchNotAccepted is returned when a batch is not accepted by the settlement layer.
Expand Down
1 change: 0 additions & 1 deletion settlement/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func newHubClient(config settlement.Config, pubsub *pubsub.Server, logger log.Lo

func initConfig(conf settlement.Config) (proposer string, err error) {
if conf.KeyringHomeDir == "" {

if conf.ProposerPubKey != "" {
proposer = conf.ProposerPubKey
} else {
Expand Down
7 changes: 4 additions & 3 deletions settlement/grpc/mockserv/mockserv.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/dymensionxyz/dymint/store"
)

var settlementKVPrefix = []byte{0}
var slStateIndexKey = []byte("slStateIndex")
var (
settlementKVPrefix = []byte{0}
slStateIndexKey = []byte("slStateIndex")
)

type server struct {
slmock.UnimplementedMockSLServer
Expand Down Expand Up @@ -63,7 +65,6 @@ func (s *server) SetBatch(ctx context.Context, in *slmock.SLSetBatchRequest) (*s
}

func GetServer(conf settlement.GrpcConfig) *grpc.Server {

srv := grpc.NewServer()

slstore := store.NewDefaultKVStore(".", "db", "settlement")
Expand Down
6 changes: 4 additions & 2 deletions settlement/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (

const kvStoreDBName = "settlement"

var settlementKVPrefix = []byte{0}
var slStateIndexKey = []byte("slStateIndex")
var (
settlementKVPrefix = []byte{0}
slStateIndexKey = []byte("slStateIndex")
)

// LayerClient is an extension of the base settlement layer client
// for usage in tests and local development.
Expand Down
Loading