Skip to content

Commit

Permalink
Implement Data Columns By Range Request And Response Methods (#13972)
Browse files Browse the repository at this point in the history
* Add Data Structure for New Request Type

* Add Data Column By Range Handler

* Add Data Column Request Methods

* Add new validation for columns by range requests

* Fix Build

* Allow Prysm Node To Fetch Data Columns

* Allow Prysm Node To Fetch Data Columns And Sync

* Bug Fixes For Interop

* GoFmt

* Use different var

* Manu's Review
  • Loading branch information
nisdas committed Aug 15, 2024
1 parent 9726309 commit b417bc6
Show file tree
Hide file tree
Showing 20 changed files with 986 additions and 85 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/das/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"availability.go",
"availability_columns.go",
"cache.go",
"iface.go",
"mock.go",
Expand All @@ -20,6 +21,7 @@ go_library(
"//runtime/logging:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
Expand Down
151 changes: 151 additions & 0 deletions beacon-chain/das/availability_columns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package das

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
)

// LazilyPersistentStoreColumn is an implementation of AvailabilityStore to be used when batch syncing data columns.
// This implementation will hold any blobs passed to Persist until the IsDataAvailable is called for their
// block, at which time they will undergo full verification and be saved to the disk.
type LazilyPersistentStoreColumn struct {
store *filesystem.BlobStorage
cache *cache
verifier ColumnBatchVerifier
nodeID enode.ID
}

type ColumnBatchVerifier interface {
VerifiedRODataColumns(ctx context.Context, blk blocks.ROBlock, sc []blocks.RODataColumn) ([]blocks.VerifiedRODataColumn, error)
}

func NewLazilyPersistentStoreColumn(store *filesystem.BlobStorage, verifier ColumnBatchVerifier, id enode.ID) *LazilyPersistentStoreColumn {
return &LazilyPersistentStoreColumn{
store: store,
cache: newCache(),
verifier: verifier,
nodeID: id,
}
}

// TODO: Very Ugly, change interface to allow for columns and blobs
func (s *LazilyPersistentStoreColumn) Persist(current primitives.Slot, sc ...blocks.ROBlob) error {
return nil
}

// PersistColumns adds columns to the working column cache. columns stored in this cache will be persisted
// for at least as long as the node is running. Once IsDataAvailable succeeds, all blobs referenced
// by the given block are guaranteed to be persisted for the remainder of the retention period.
func (s *LazilyPersistentStoreColumn) PersistColumns(current primitives.Slot, sc ...blocks.RODataColumn) error {
if len(sc) == 0 {
return nil
}
if len(sc) > 1 {
first := sc[0].BlockRoot()
for i := 1; i < len(sc); i++ {
if first != sc[i].BlockRoot() {
return errMixedRoots
}
}
}
if !params.WithinDAPeriod(slots.ToEpoch(sc[0].Slot()), slots.ToEpoch(current)) {
return nil
}
key := keyFromColumn(sc[0])
entry := s.cache.ensure(key)
for i := range sc {
if err := entry.stashColumns(&sc[i]); err != nil {
return err
}
}
return nil
}

// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments, err := fullCommitmentsToCheck(b, current)
if err != nil {
return errors.Wrapf(err, "could check data availability for block %#x", b.Root())
}
// Return early for blocks that are pre-deneb or which do not have any commitments.
if blockCommitments.count() == 0 {
return nil
}

key := keyFromBlock(b)
entry := s.cache.ensure(key)
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", b.Root())).
WithError(err).
Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable")
} else {
entry.setDiskSummary(sumz.Summary(root))
}

// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
sidecars, err := entry.filterColumns(root, blockCommitments)
if err != nil {
return errors.Wrap(err, "incomplete BlobSidecar batch")
}
// Do thorough verifications of each BlobSidecar for the block.
// Same as above, we don't save BlobSidecars if there are any problems with the batch.
vscs, err := s.verifier.VerifiedRODataColumns(ctx, b, sidecars)
if err != nil {
var me verification.VerificationMultiError
ok := errors.As(err, &me)
if ok {
fails := me.Failures()
lf := make(log.Fields, len(fails))
for i := range fails {
lf[fmt.Sprintf("fail_%d", i)] = fails[i].Error()
}
log.WithFields(lf).
Debug("invalid ColumnSidecars received")
}
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", root)
}
// Ensure that each column sidecar is written to disk.
for i := range vscs {
if err := s.store.SaveDataColumn(vscs[i]); err != nil {
return errors.Wrapf(err, "failed to save ColumnSidecar index %d for block %#x", vscs[i].ColumnIndex, root)
}
}
// All ColumnSidecars are persisted - da check succeeds.
return nil
}

func fullCommitmentsToCheck(b blocks.ROBlock, current primitives.Slot) (safeCommitmentsArray, error) {
var ar safeCommitmentsArray
if b.Version() < version.Deneb {
return ar, nil
}
// We are only required to check within MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return ar, nil
}
kc, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return ar, err
}
for i := range ar {
copy(ar[i], kc)
}
return ar, nil
}
57 changes: 57 additions & 0 deletions beacon-chain/das/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package das

import (
"bytes"
"reflect"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
Expand Down Expand Up @@ -38,6 +39,10 @@ func keyFromSidecar(sc blocks.ROBlob) cacheKey {
return cacheKey{slot: sc.Slot(), root: sc.BlockRoot()}
}

func keyFromColumn(sc blocks.RODataColumn) cacheKey {
return cacheKey{slot: sc.Slot(), root: sc.BlockRoot()}
}

// keyFromBlock is a convenience method for constructing a cacheKey from a ROBlock value.
func keyFromBlock(b blocks.ROBlock) cacheKey {
return cacheKey{slot: b.Block().Slot(), root: b.Root()}
Expand All @@ -61,6 +66,7 @@ func (c *cache) delete(key cacheKey) {
// cacheEntry holds a fixed-length cache of BlobSidecars.
type cacheEntry struct {
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
colScs [fieldparams.NumberOfColumns]*blocks.RODataColumn
diskSummary filesystem.BlobStorageSummary
}

Expand All @@ -82,6 +88,17 @@ func (e *cacheEntry) stash(sc *blocks.ROBlob) error {
return nil
}

func (e *cacheEntry) stashColumns(sc *blocks.RODataColumn) error {
if sc.ColumnIndex >= fieldparams.NumberOfColumns {
return errors.Wrapf(errIndexOutOfBounds, "index=%d", sc.ColumnIndex)
}
if e.colScs[sc.ColumnIndex] != nil {
return errors.Wrapf(ErrDuplicateSidecar, "root=%#x, index=%d, commitment=%#x", sc.BlockRoot(), sc.ColumnIndex, sc.KzgCommitments)
}
e.colScs[sc.ColumnIndex] = sc
return nil
}

// filter evicts sidecars that are not committed to by the block and returns custom
// errors if the cache is missing any of the commitments, or if the commitments in
// the cache do not match those found in the block. If err is nil, then all expected
Expand Down Expand Up @@ -117,6 +134,35 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
return scs, nil
}

func (e *cacheEntry) filterColumns(root [32]byte, kc safeCommitmentsArray) ([]blocks.RODataColumn, error) {
if e.diskSummary.AllAvailable(kc.count()) {
return nil, nil
}
scs := make([]blocks.RODataColumn, 0, kc.count())
for i := uint64(0); i < fieldparams.NumberOfColumns; i++ {
// We already have this blob, we don't need to write it or validate it.
if e.diskSummary.HasIndex(i) {
continue
}
if kc[i] == nil {
if e.colScs[i] != nil {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment)
}
continue
}

if e.colScs[i] == nil {
return nil, errors.Wrapf(errMissingSidecar, "root=%#x, index=%#x", root, i)
}
if !reflect.DeepEqual(kc[i], e.colScs[i].KzgCommitments) {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.colScs[i].KzgCommitments, kc[i])
}
scs = append(scs, *e.colScs[i])
}

return scs, nil
}

// safeCommitmentArray is a fixed size array of commitment byte slices. This is helpful for avoiding
// gratuitous bounds checks.
type safeCommitmentArray [fieldparams.MaxBlobsPerBlock][]byte
Expand All @@ -129,3 +175,14 @@ func (s safeCommitmentArray) count() int {
}
return fieldparams.MaxBlobsPerBlock
}

type safeCommitmentsArray [fieldparams.NumberOfColumns][][]byte

func (s safeCommitmentsArray) count() int {
for i := range s {
if s[i] == nil {
return i
}
}
return fieldparams.NumberOfColumns
}
2 changes: 1 addition & 1 deletion beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (bs *BlobStorage) ColumnIndices(root [32]byte) ([fieldparams.NumberOfColumn
return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
}
if u >= fieldparams.NumberOfColumns {
return mask, errIndexOutOfBounds
return mask, errors.Wrapf(errIndexOutOfBounds, "invalid index %d", u)
}
mask[u] = true
}
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ go_library(
"//runtime/version:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_btcsuite_btcd_btcec_v2//:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
Expand Down
12 changes: 12 additions & 0 deletions beacon-chain/p2p/rpc_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const BlobSidecarsByRootName = "/blob_sidecars_by_root"
// DataColumnSidecarsByRootName is the name for the DataColumnSidecarsByRoot v1 message topic.
const DataColumnSidecarsByRootName = "/data_column_sidecars_by_root"

// DataColumnSidecarsByRangeName is the name for the DataColumnSidecarsByRange v1 message topic.
const DataColumnSidecarsByRangeName = "/data_column_sidecars_by_range"

const (
// V1 RPC Topics
// RPCStatusTopicV1 defines the v1 topic for the status rpc method.
Expand All @@ -71,6 +74,9 @@ const (
// RPCDataColumnSidecarsByRootTopicV1 is a topic for requesting data column sidecars by their block root. New in PeerDAS.
// /eth2/beacon_chain/req/data_column_sidecars_by_root/1
RPCDataColumnSidecarsByRootTopicV1 = protocolPrefix + DataColumnSidecarsByRootName + SchemaVersionV1
// RPCDataColumnSidecarsByRangeTopicV1 is a topic for requesting data column sidecars by their slot. New in PeerDAS.
// /eth2/beacon_chain/req/data_column_sidecars_by_range/1
RPCDataColumnSidecarsByRangeTopicV1 = protocolPrefix + DataColumnSidecarsByRangeName + SchemaVersionV1

// V2 RPC Topics
// RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method.
Expand Down Expand Up @@ -107,6 +113,10 @@ var RPCTopicMappings = map[string]interface{}{
RPCBlobSidecarsByRangeTopicV1: new(pb.BlobSidecarsByRangeRequest),
// BlobSidecarsByRoot v1 Message
RPCBlobSidecarsByRootTopicV1: new(p2ptypes.BlobSidecarsByRootReq),
// DataColumnSidecarsByRange v1 Message
RPCDataColumnSidecarsByRangeTopicV1: new(pb.DataColumnSidecarsByRangeRequest),
// DataColumnSidecarsByRoot v1 Message
RPCDataColumnSidecarsByRootTopicV1: new(p2ptypes.BlobSidecarsByRootReq),
}

// Maps all registered protocol prefixes.
Expand All @@ -125,6 +135,8 @@ var messageMapping = map[string]bool{
MetadataMessageName: true,
BlobSidecarsByRangeName: true,
BlobSidecarsByRootName: true,
DataColumnSidecarsByRootName: true,
DataColumnSidecarsByRangeName: true,
}

// Maps all the RPC messages which are to updated in altair.
Expand Down
24 changes: 24 additions & 0 deletions beacon-chain/p2p/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ import (
"path"
"time"

"github.com/btcsuite/btcd/btcec/v2"
gCrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
Expand Down Expand Up @@ -173,3 +177,23 @@ func verifyConnectivity(addr string, port uint, protocol string) {
}
}
}

func ConvertPeerIDToNodeID(pid peer.ID) (enode.ID, error) {
// Retrieve the public key object of the peer under "crypto" form.
pubkeyObjCrypto, err := pid.ExtractPublicKey()
if err != nil {
return [32]byte{}, errors.Wrap(err, "extract public key")
}
// Extract the bytes representation of the public key.
compressedPubKeyBytes, err := pubkeyObjCrypto.Raw()
if err != nil {
return [32]byte{}, errors.Wrap(err, "public key raw")
}
// Retrieve the public key object of the peer under "SECP256K1" form.
pubKeyObjSecp256k1, err := btcec.ParsePubKey(compressedPubKeyBytes)
if err != nil {
return [32]byte{}, errors.Wrap(err, "parse public key")
}
newPubkey := &ecdsa.PublicKey{Curve: gCrypto.S256(), X: pubKeyObjSecp256k1.X(), Y: pubKeyObjSecp256k1.Y()}
return enode.PubkeyToIDV4(newPubkey), nil
}
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"rpc_blob_sidecars_by_range.go",
"rpc_blob_sidecars_by_root.go",
"rpc_chunked_response.go",
"rpc_data_column_sidecars_by_range.go",
"rpc_data_column_sidecars_by_root.go",
"rpc_goodbye.go",
"rpc_metadata.go",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
Expand All @@ -32,6 +33,7 @@ go_library(
"//beacon-chain/sync/verify:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
Expand Down
Loading

0 comments on commit b417bc6

Please sign in to comment.