Skip to content

Commit

Permalink
feat(da grpc): retry instead of freeze (#1315)
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored Jan 13, 2025
1 parent 15bf560 commit f5dccaf
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 43 deletions.
151 changes: 108 additions & 43 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"

"cosmossdk.io/math"
uretry "github.com/dymensionxyz/dymint/utils/retry"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand All @@ -26,6 +27,10 @@ type DataAvailabilityLayerClient struct {
client dalc.DALCServiceClient
synced chan struct{}
logger types.Logger

// FIXME: ought to refactor so ctx is given to us by the object creator (same pattern should apply to celestia, avail clients)
ctx context.Context
cancel context.CancelFunc
}

// Config contains configuration options for DataAvailabilityLayerClient.
Expand All @@ -50,6 +55,7 @@ var (
func (d *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, _ store.KV, logger types.Logger, options ...da.Option) error {
d.logger = logger
d.synced = make(chan struct{}, 1)
d.ctx, d.cancel = context.WithCancel(context.Background())
if len(config) == 0 {
d.config = DefaultConfig
return nil
Expand Down Expand Up @@ -77,7 +83,7 @@ func (d *DataAvailabilityLayerClient) Start() error {

// Stop closes connection to gRPC server.
func (d *DataAvailabilityLayerClient) Stop() error {
d.logger.Info("stopping GRPC DALC")
d.cancel()
return d.conn.Close()
}

Expand All @@ -93,32 +99,67 @@ func (d *DataAvailabilityLayerClient) GetClientType() da.Client {

// SubmitBatch proxies SubmitBatch request to gRPC server.
func (d *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultSubmitBatch {
resp, err := d.client.SubmitBatch(context.TODO(), &dalc.SubmitBatchRequest{Batch: batch.ToProto()})
if err != nil {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err},
backoff := d.getBackoff()

for {
select {
case <-d.ctx.Done():
return da.ResultSubmitBatch{}
default:
resp, err := d.client.SubmitBatch(context.TODO(), &dalc.SubmitBatchRequest{Batch: batch.ToProto()})
if err != nil {
if !errorIsRetryable(err) {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: err,
},
}
}
}
if err != nil {
d.logger.Error("Submit blob.", "error", err)
backoff.Sleep()
continue
}
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
},
SubmitMetaData: &da.DASubmitMetaData{
Client: da.Grpc,
Height: resp.Result.DataLayerHeight,
},
}
}
}
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
},
SubmitMetaData: &da.DASubmitMetaData{
Client: da.Grpc,
Height: resp.Result.DataLayerHeight,
},
}
}

// CheckBatchAvailability proxies CheckBatchAvailability request to gRPC server.
func (d *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASubmitMetaData) da.ResultCheckBatch {
resp, err := d.client.CheckBatchAvailability(context.TODO(), &dalc.CheckBatchAvailabilityRequest{DataLayerHeight: daMetaData.Height})
if err != nil {
return da.ResultCheckBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}}
}
return da.ResultCheckBatch{
BaseResult: da.BaseResult{Code: da.StatusCode(resp.Result.Code), Message: resp.Result.Message},
backoff := d.getBackoff()

for {
select {
case <-d.ctx.Done():
d.logger.Debug("Context cancelled")
return da.ResultCheckBatch{}
default:
resp, err := d.client.CheckBatchAvailability(context.TODO(), &dalc.CheckBatchAvailabilityRequest{DataLayerHeight: daMetaData.Height})
if err != nil {
if !errorIsRetryable(err) {
return da.ResultCheckBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}}
}
d.logger.Error("Check blob availability.", "error", err)
backoff.Sleep()
continue
}
return da.ResultCheckBatch{
BaseResult: da.BaseResult{Code: da.StatusCode(resp.Result.Code), Message: resp.Result.Message},
}
}
}
}

Expand All @@ -129,29 +170,44 @@ func (d *DataAvailabilityLayerClient) GetMaxBlobSizeBytes() uint32 {

// RetrieveBatches proxies RetrieveBlocks request to gRPC server.
func (d *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch {
resp, err := d.client.RetrieveBatches(context.TODO(), &dalc.RetrieveBatchesRequest{DataLayerHeight: daMetaData.Height})
if err != nil {
return da.ResultRetrieveBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}}
}

batches := make([]*types.Batch, len(resp.Batches))
for i, batch := range resp.Batches {
var b types.Batch
err = b.FromProto(batch)
if err != nil {
return da.ResultRetrieveBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}}
backoff := d.getBackoff()

for {
select {
case <-d.ctx.Done():
d.logger.Debug("Context cancelled")
return da.ResultRetrieveBatch{}
default:
resp, err := d.client.RetrieveBatches(context.TODO(), &dalc.RetrieveBatchesRequest{DataLayerHeight: daMetaData.Height})
if err != nil {
if !errorIsRetryable(err) {
return da.ResultRetrieveBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}}
}
d.logger.Error("Retrieve batches", "error", err)
backoff.Sleep()
continue
}

batches := make([]*types.Batch, len(resp.Batches))
for i, batch := range resp.Batches {
var b types.Batch
err = b.FromProto(batch)
if err != nil {
return da.ResultRetrieveBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}}
}
batches[i] = &b
}
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
},
CheckMetaData: &da.DACheckMetaData{
Height: daMetaData.Height,
},
Batches: batches,
}
}
batches[i] = &b
}
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusCode(resp.Result.Code),
Message: resp.Result.Message,
},
CheckMetaData: &da.DACheckMetaData{
Height: daMetaData.Height,
},
Batches: batches,
}
}

Expand All @@ -161,3 +217,12 @@ func (d *DataAvailabilityLayerClient) GetSignerBalance() (da.Balance, error) {
Denom: "adym",
}, nil
}

func (d *DataAvailabilityLayerClient) getBackoff() uretry.Backoff {
return uretry.NewBackoffConfig().Backoff()
}

func errorIsRetryable(err error) bool {
// kept it simple for now
return true
}
2 changes: 2 additions & 0 deletions da/grpc/mockserv/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/dymensionxyz/dymint/store"
)

// go run github.com/dymensionxyz/dymint/da/grpc/mockserv/cmd

func main() {
conf := grpcda.DefaultConfig

Expand Down
2 changes: 2 additions & 0 deletions utils/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"
)

// TODO: rethink this package. can probably use avast/retry-go instead

const (
defaultBackoffInitialDelay = 200 * time.Millisecond
defaultBackoffMaxDelay = 30 * time.Second
Expand Down

0 comments on commit f5dccaf

Please sign in to comment.