Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: using celestia RPC instead of Rest API #544

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: '1.19'
go-version: '1.20.5'
- run: git config --global url.https://[email protected]/.insteadOf https://github.com/
- name: golangci-lint
uses: golangci/[email protected]
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.49
version: v1.52.0

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: 1.20.5
- run: git config --global url.https://[email protected]/.insteadOf https://github.com/

- name: Build
Expand Down
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func DefaultConfig(home, chainId string) *NodeConfig {
BlockTime: 200 * time.Millisecond,
EmptyBlocksMaxTime: 3 * time.Second,
BatchSubmitMaxTime: 30 * time.Second,
NamespaceID: "000000000000ffff",
NamespaceID: "0000000000000000ffff",
BlockBatchSize: 500,
BlockBatchMaxSizeBytes: 500000,
GossipedBlocksCacheSize: 50},
Expand Down
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }}
gossiped_blocks_cache_size = {{ .BlockManagerConfig.GossipedBlocksCacheSize }}

#celestia config example:
# da_config = "{\"base_url\": \"http://127.0.0.1:26659\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_adjustment\": 1.3, \"namespace_id\":\"000000000000ffff\"}"
# da_config = "{\"base_url\": \"http://127.0.0.1:26658\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_adjustment\": 1.3, \"token\":\"TOKEN\"}"
# Avail config example:
# da_config = "{\"seed\": \"MNEMONIC\", \"api_url\": \"wss://kate.avail.tools/ws\", \"app_id\": 0, \"tip\":10}"

Expand Down
202 changes: 84 additions & 118 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,32 @@

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"time"

"github.com/avast/retry-go/v4"
"cosmossdk.io/math"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/pubsub"
rpcclient "github.com/tendermint/tendermint/rpc/client"
httprpcclient "github.com/tendermint/tendermint/rpc/client/http"

cnc "github.com/celestiaorg/go-cnc"
openrpc "github.com/rollkit/celestia-openrpc"

"github.com/rollkit/celestia-openrpc/types/blob"
"github.com/rollkit/celestia-openrpc/types/share"

"github.com/dymensionxyz/dymint/da"
celtypes "github.com/dymensionxyz/dymint/da/celestia/types"
"github.com/dymensionxyz/dymint/log"
"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/dymint/types"
pb "github.com/dymensionxyz/dymint/types/pb/dymint"
)

type CNCClientI interface {
SubmitPFB(ctx context.Context, namespaceID cnc.Namespace, blob []byte, fee int64, gasLimit uint64) (*cnc.TxResponse, error)
NamespacedShares(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error)
NamespacedData(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error)
}

// DataAvailabilityLayerClient use celestia-node public API.
type DataAvailabilityLayerClient struct {
client CNCClientI
rpc celtypes.CelestiaRPCClient

pubsubServer *pubsub.Server
RPCClient rpcclient.Client
config Config
logger log.Logger
ctx context.Context
Expand All @@ -44,17 +40,10 @@
var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{}
var _ da.BatchRetriever = &DataAvailabilityLayerClient{}

// WithCNCClient sets CNC client.
func WithCNCClient(client CNCClientI) da.Option {
return func(daLayerClient da.DataAvailabilityLayerClient) {
daLayerClient.(*DataAvailabilityLayerClient).client = client
}
}

// WithRPCClient sets rpc client.
func WithRPCClient(rpcClient rpcclient.Client) da.Option {
func WithRPCClient(rpc celtypes.CelestiaRPCClient) da.Option {
return func(daLayerClient da.DataAvailabilityLayerClient) {
daLayerClient.(*DataAvailabilityLayerClient).RPCClient = rpcClient
daLayerClient.(*DataAvailabilityLayerClient).rpc = rpc
}
}

Expand Down Expand Up @@ -112,28 +101,64 @@
c.txPollingRetryDelay = defaultTxPollingRetryDelay
c.txPollingAttempts = defaultTxPollingAttempts
c.submitRetryDelay = defaultSubmitRetryDelay
c.RPCClient, err = httprpcclient.New(c.config.AppNodeURL, "")
if err != nil {
return err
}
c.client, err = cnc.NewClient(c.config.BaseURL, cnc.WithTimeout(c.config.Timeout))
if err != nil {
return err
}

c.ctx, c.cancel = context.WithCancel(context.Background())

// Apply options
for _, apply := range options {
apply(c)
}

c.ctx, c.cancel = context.WithCancel(context.Background())

return nil
}

// Start prepares DataAvailabilityLayerClient to work.
func (c *DataAvailabilityLayerClient) Start() error {
func (c *DataAvailabilityLayerClient) Start() (err error) {
c.logger.Info("starting Celestia Data Availability Layer Client")

// other client has already been set
if c.rpc != nil {
c.logger.Debug("celestia-node client already set")
return nil
}

rpc, err := openrpc.NewClient(c.ctx, c.config.BaseURL, c.config.AuthToken)
if err != nil {
return err
}

Check warning on line 128 in da/celestia/celestia.go

View check run for this annotation

Codecov / codecov/patch

da/celestia/celestia.go#L125-L128

Added lines #L125 - L128 were not covered by tests

state, err := rpc.Header.SyncState(c.ctx)
if err != nil {
return err
}

Check warning on line 133 in da/celestia/celestia.go

View check run for this annotation

Codecov / codecov/patch

da/celestia/celestia.go#L130-L133

Added lines #L130 - L133 were not covered by tests

if !state.Finished() {
c.logger.Info("waiting for celestia-node to finish syncing", "height", state.Height, "target", state.ToHeight)

done := make(chan error, 1)
go func() {
done <- rpc.Header.SyncWait(c.ctx)
}()

Check warning on line 141 in da/celestia/celestia.go

View check run for this annotation

Codecov / codecov/patch

da/celestia/celestia.go#L135-L141

Added lines #L135 - L141 were not covered by tests

ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for {
select {
case err := <-done:
if err != nil {
return err
}
return nil
case <-ticker.C:
c.logger.Info("celestia-node still syncing", "height", state.Height, "target", state.ToHeight)

Check warning on line 154 in da/celestia/celestia.go

View check run for this annotation

Codecov / codecov/patch

da/celestia/celestia.go#L143-L154

Added lines #L143 - L154 were not covered by tests
}
}
}

c.logger.Info("celestia-node is synced", "height", state.ToHeight)

c.rpc = NewOpenRPC(rpc)

Check warning on line 161 in da/celestia/celestia.go

View check run for this annotation

Codecov / codecov/patch

da/celestia/celestia.go#L159-L161

Added lines #L159 - L161 were not covered by tests
return nil
}

Expand All @@ -155,7 +180,7 @@

// SubmitBatch submits a batch to the DA layer.
func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultSubmitBatch {
blob, err := batch.MarshalBinary()
data, err := batch.MarshalBinary()
if err != nil {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Expand All @@ -164,20 +189,31 @@
},
}
}
estimatedGas := DefaultEstimateGas(uint32(len(blob)))

blockBlob, err := blob.NewBlobV0(c.config.NamespaceID.Bytes(), data)
if err != nil {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}

Check warning on line 201 in da/celestia/celestia.go

View check run for this annotation

Codecov / codecov/patch

da/celestia/celestia.go#L195-L201

Added lines #L195 - L201 were not covered by tests
blobs := []*blob.Blob{blockBlob}

estimatedGas := DefaultEstimateGas(uint32(len(data)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we estimate here the len(blobs) vs len(data)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's the same as in celestia-node

gasWanted := uint64(float64(estimatedGas) * c.config.GasAdjustment)
fees := c.calculateFees(gasWanted)
c.logger.Debug("Submitting to da blob with size", "size", len(blob), "estimatedGas", estimatedGas, "gasAdjusted", gasWanted, "fees", fees)
c.logger.Debug("Submitting to da blob with size", "size", len(blockBlob.Data), "estimatedGas", estimatedGas, "gasAdjusted", gasWanted, "fees", fees)

for {
select {
case <-c.ctx.Done():
c.logger.Debug("Context cancelled")
return da.ResultSubmitBatch{}
default:
//SubmitPFB sets an error if the txResponse has error, so we check check the txResponse for error
txResponse, err := c.client.SubmitPFB(c.ctx, c.config.NamespaceID, blob, fees, gasWanted)
if txResponse == nil {
txResponse, err := c.rpc.SubmitPayForBlob(c.ctx, math.NewInt(fees), gasWanted, blobs)
if err != nil {
c.logger.Error("Failed to submit DA batch. Emitting health event and trying again", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)
if err != nil {
Expand All @@ -187,34 +223,18 @@
continue
}

if txResponse.Code != 0 {
c.logger.Error("Failed to submit DA batch. Emitting health event and trying again", "txResponse", txResponse.RawLog, "code", txResponse.Code)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, errors.New(txResponse.RawLog))
//double check txResponse is not nil - not supposed to happen
if txResponse == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we can skip this response and the next (txresponse.Code) as they are doing it currently in their new api: https://github.com/celestiaorg/celestia-node/blob/ccf9b56317ae8941f91c5d09f24fe8422bbe64bb/state/core_access.go#L256

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validating txResponse != nil is good practice to avoid possible panics.
i'll remove the code check

err := errors.New("txResponse is nil")
c.logger.Error("Failed to submit DA batch", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)

Check warning on line 230 in da/celestia/celestia.go

View check run for this annotation

Codecov / codecov/patch

da/celestia/celestia.go#L228-L230

Added lines #L228 - L230 were not covered by tests
if err != nil {
return res
}
time.Sleep(c.submitRetryDelay)
continue
}

// Here we assume that if txResponse is not nil and also error is not nil it means that the transaction
// was submitted (not necessarily accepted) and we still didn't get a clear status regarding it (e.g timeout).
// hence trying to poll for it.
daHeight := uint64(txResponse.Height)
if daHeight == 0 {
c.logger.Debug("Failed to receive DA batch inclusion result. Waiting for inclusion", "txHash", txResponse.TxHash)
daHeight, err = c.waitForTXInclusion(txResponse.TxHash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove the inclusion test?
the submitPayForBlock broadcasts the tx in sync mode which means it's not necessarily gonna get included in a block.
Also from their docs: https://docs.celestia.org/nodes/transaction-resubmission#transaction-resubmission

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's actually in block mode and by their docs it will fail after 75 sec, and the caller expected to retry.
this implementation is not relevant IMO

We do plan to get the inclusion proof/commitment, we have separate issue for it.

if err != nil {
c.logger.Error("Failed to receive DA batch inclusion result. Emitting health event and trying again", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)
if err != nil {
return res
}
time.Sleep(c.submitRetryDelay)
continue
}
}

c.logger.Info("Successfully submitted DA batch", "txHash", txResponse.TxHash, "daHeight", txResponse.Height, "gasWanted", txResponse.GasWanted, "gasUsed", txResponse.GasUsed)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, true, nil)
if err != nil {
Expand All @@ -224,37 +244,16 @@
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "tx hash: " + txResponse.TxHash,
DAHeight: daHeight,
DAHeight: uint64(txResponse.Height),
},
}
}
}
}

// CheckBatchAvailability queries DA layer to check data availability of block at given height.
func (c *DataAvailabilityLayerClient) CheckBatchAvailability(dataLayerHeight uint64) da.ResultCheckBatch {
shares, err := c.client.NamespacedShares(c.ctx, c.config.NamespaceID, dataLayerHeight)
if err != nil {
return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}

return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
DAHeight: dataLayerHeight,
},
DataAvailable: len(shares) > 0,
}
}

// RetrieveBatches gets a batch of blocks from DA layer.
func (c *DataAvailabilityLayerClient) RetrieveBatches(dataLayerHeight uint64) da.ResultRetrieveBatch {
data, err := c.client.NamespacedData(c.ctx, c.config.NamespaceID, dataLayerHeight)
blobs, err := c.rpc.GetAll(c.ctx, dataLayerHeight, []share.Namespace{c.config.NamespaceID.Bytes()})
if err != nil {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Expand All @@ -265,9 +264,9 @@
}

var batches []*types.Batch
for i, msg := range data {
for i, blob := range blobs {
var batch pb.Batch
err = proto.Unmarshal(msg, &batch)
err = proto.Unmarshal(blob.Data, &batch)

Check warning on line 269 in da/celestia/celestia.go

View check run for this annotation

Codecov / codecov/patch

da/celestia/celestia.go#L269

Added line #L269 was not covered by tests
if err != nil {
c.logger.Error("failed to unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err)
continue
Expand All @@ -293,36 +292,3 @@
Batches: batches,
}
}

// FIXME(omritoptix): currently we're relaying on a node without validating it using a light client.
// should be proxied through light client once it's supported (https://github.com/dymensionxyz/dymint/issues/335).
func (c *DataAvailabilityLayerClient) waitForTXInclusion(txHash string) (uint64, error) {

hashBytes, err := hex.DecodeString(txHash)
if err != nil {
return 0, err
}

inclusionHeight := uint64(0)

err = retry.Do(func() error {
result, err := c.RPCClient.Tx(c.ctx, hashBytes, false)
if err != nil {
return err
}

if result == nil || err != nil {
c.logger.Error("couldn't get transaction from node", "err", err)
return errors.New("transaction not found")
}

inclusionHeight = uint64(result.Height)

return nil
}, retry.Attempts(uint(c.txPollingAttempts)), retry.DelayType(retry.FixedDelay), retry.Delay(c.txPollingRetryDelay))

if err != nil {
return 0, err
}
return inclusionHeight, nil
}
Loading
Loading