Skip to content

Commit

Permalink
EIP-7549: attestation pool (#14121)
Browse files Browse the repository at this point in the history
* implementation

* test fixes

* Electra tests

* remove aggregator tests

* id comments and tests

* make Id equal to [33]byte
  • Loading branch information
rkapka authored Jun 25, 2024
1 parent 5f0d607 commit b8aad84
Show file tree
Hide file tree
Showing 20 changed files with 452 additions and 138 deletions.
3 changes: 2 additions & 1 deletion beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ go_library(
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/attestation/aggregation/attestations:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/operations/attestations/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/attestation/aggregation/attestations:go_default_library",
"//runtime/version:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
Expand All @@ -39,14 +40,15 @@ go_test(
embed = [":go_default_library"],
deps = [
"//config/fieldparams:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)
112 changes: 78 additions & 34 deletions beacon-chain/operations/attestations/kv/aggregated.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
attaggregation "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
Expand All @@ -32,28 +34,28 @@ func (c *AttCaches) aggregateUnaggregatedAtts(ctx context.Context, unaggregatedA
_, span := trace.StartSpan(ctx, "operations.attestations.kv.aggregateUnaggregatedAtts")
defer span.End()

attsByDataRoot := make(map[[32]byte][]ethpb.Att, len(unaggregatedAtts))
attsByVerAndDataRoot := make(map[attestation.Id][]ethpb.Att, len(unaggregatedAtts))
for _, att := range unaggregatedAtts {
attDataRoot, err := att.GetData().HashTreeRoot()
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return err
return errors.Wrap(err, "could not create attestation ID")
}
attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att)
attsByVerAndDataRoot[id] = append(attsByVerAndDataRoot[id], att)
}

// Aggregate unaggregated attestations from the pool and save them in the pool.
// Track the unaggregated attestations that aren't able to aggregate.
leftOverUnaggregatedAtt := make(map[[32]byte]bool)
leftOverUnaggregatedAtt := make(map[attestation.Id]bool)

leftOverUnaggregatedAtt = c.aggregateParallel(attsByDataRoot, leftOverUnaggregatedAtt)
leftOverUnaggregatedAtt = c.aggregateParallel(attsByVerAndDataRoot, leftOverUnaggregatedAtt)

// Remove the unaggregated attestations from the pool that were successfully aggregated.
for _, att := range unaggregatedAtts {
h, err := hashFn(att)
id, err := attestation.NewId(att, attestation.Full)
if err != nil {
return err
return errors.Wrap(err, "could not create attestation ID")
}
if leftOverUnaggregatedAtt[h] {
if leftOverUnaggregatedAtt[id] {
continue
}
if err := c.DeleteUnaggregatedAttestation(att); err != nil {
Expand All @@ -66,7 +68,7 @@ func (c *AttCaches) aggregateUnaggregatedAtts(ctx context.Context, unaggregatedA
// aggregateParallel aggregates attestations in parallel for `atts` and saves them in the pool,
// returns the unaggregated attestations that weren't able to aggregate.
// Given `n` CPU cores, it creates a channel of size `n` and spawns `n` goroutines to aggregate attestations
func (c *AttCaches) aggregateParallel(atts map[[32]byte][]ethpb.Att, leftOver map[[32]byte]bool) map[[32]byte]bool {
func (c *AttCaches) aggregateParallel(atts map[attestation.Id][]ethpb.Att, leftOver map[attestation.Id]bool) map[attestation.Id]bool {
var leftoverLock sync.Mutex
wg := sync.WaitGroup{}

Expand All @@ -92,13 +94,13 @@ func (c *AttCaches) aggregateParallel(atts map[[32]byte][]ethpb.Att, leftOver ma
continue
}
} else {
h, err := hashFn(aggregated)
id, err := attestation.NewId(aggregated, attestation.Full)
if err != nil {
log.WithError(err).Error("could not hash attestation")
log.WithError(err).Error("Could not create attestation ID")
continue
}
leftoverLock.Lock()
leftOver[h] = true
leftOver[id] = true
leftoverLock.Unlock()
}
}
Expand Down Expand Up @@ -139,25 +141,26 @@ func (c *AttCaches) SaveAggregatedAttestation(att ethpb.Att) error {
return nil
}

r, err := hashFn(att.GetData())
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
return errors.Wrap(err, "could not create attestation ID")
}
copiedAtt := att.Copy()

c.aggregatedAttLock.Lock()
defer c.aggregatedAttLock.Unlock()
atts, ok := c.aggregatedAtt[r]
atts, ok := c.aggregatedAtt[id]
if !ok {
atts := []ethpb.Att{copiedAtt}
c.aggregatedAtt[r] = atts
c.aggregatedAtt[id] = atts
return nil
}

atts, err = attaggregation.Aggregate(append(atts, copiedAtt))
if err != nil {
return err
}
c.aggregatedAtt[r] = atts
c.aggregatedAtt[id] = atts

return nil
}
Expand Down Expand Up @@ -191,17 +194,56 @@ func (c *AttCaches) AggregatedAttestations() []ethpb.Att {

// AggregatedAttestationsBySlotIndex returns the aggregated attestations in cache,
// filtered by committee index and slot.
func (c *AttCaches) AggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []ethpb.Att {
func (c *AttCaches) AggregatedAttestationsBySlotIndex(
ctx context.Context,
slot primitives.Slot,
committeeIndex primitives.CommitteeIndex,
) []*ethpb.Attestation {
_, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregatedAttestationsBySlotIndex")
defer span.End()

atts := make([]ethpb.Att, 0)
atts := make([]*ethpb.Attestation, 0)

c.aggregatedAttLock.RLock()
defer c.aggregatedAttLock.RUnlock()
for _, a := range c.aggregatedAtt {
if slot == a[0].GetData().Slot && committeeIndex == a[0].GetData().CommitteeIndex {
atts = append(atts, a...)
for _, as := range c.aggregatedAtt {
if as[0].Version() == version.Phase0 && slot == as[0].GetData().Slot && committeeIndex == as[0].GetData().CommitteeIndex {
for _, a := range as {
att, ok := a.(*ethpb.Attestation)
// This will never fail in practice because we asserted the version
if ok {
atts = append(atts, att)
}
}
}
}

return atts
}

// AggregatedAttestationsBySlotIndexElectra returns the aggregated attestations in cache,
// filtered by committee index and slot.
func (c *AttCaches) AggregatedAttestationsBySlotIndexElectra(
ctx context.Context,
slot primitives.Slot,
committeeIndex primitives.CommitteeIndex,
) []*ethpb.AttestationElectra {
_, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregatedAttestationsBySlotIndexElectra")
defer span.End()

atts := make([]*ethpb.AttestationElectra, 0)

c.aggregatedAttLock.RLock()
defer c.aggregatedAttLock.RUnlock()
for _, as := range c.aggregatedAtt {
if as[0].Version() == version.Electra && slot == as[0].GetData().Slot && as[0].CommitteeBitsVal().BitAt(uint64(committeeIndex)) {
for _, a := range as {
att, ok := a.(*ethpb.AttestationElectra)
// This will never fail in practice because we asserted the version
if ok {
atts = append(atts, att)
}
}
}
}

Expand All @@ -216,18 +258,19 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
if !helpers.IsAggregated(att) {
return errors.New("attestation is not aggregated")
}
r, err := hashFn(att.GetData())
if err != nil {
return errors.Wrap(err, "could not tree hash attestation data")
}

if err := c.insertSeenBit(att); err != nil {
return err
}

id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return errors.Wrap(err, "could not create attestation ID")
}

c.aggregatedAttLock.Lock()
defer c.aggregatedAttLock.Unlock()
attList, ok := c.aggregatedAtt[r]
attList, ok := c.aggregatedAtt[id]
if !ok {
return nil
}
Expand All @@ -241,9 +284,9 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
}
}
if len(filtered) == 0 {
delete(c.aggregatedAtt, r)
delete(c.aggregatedAtt, id)
} else {
c.aggregatedAtt[r] = filtered
c.aggregatedAtt[id] = filtered
}

return nil
Expand All @@ -254,14 +297,15 @@ func (c *AttCaches) HasAggregatedAttestation(att ethpb.Att) (bool, error) {
if err := helpers.ValidateNilAttestation(att); err != nil {
return false, err
}
r, err := hashFn(att.GetData())

id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return false, errors.Wrap(err, "could not tree hash attestation")
return false, errors.Wrap(err, "could not create attestation ID")
}

c.aggregatedAttLock.RLock()
defer c.aggregatedAttLock.RUnlock()
if atts, ok := c.aggregatedAtt[r]; ok {
if atts, ok := c.aggregatedAtt[id]; ok {
for _, a := range atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return false, err
Expand All @@ -273,7 +317,7 @@ func (c *AttCaches) HasAggregatedAttestation(att ethpb.Att) (bool, error) {

c.blockAttLock.RLock()
defer c.blockAttLock.RUnlock()
if atts, ok := c.blockAtt[r]; ok {
if atts, ok := c.blockAtt[id]; ok {
for _, a := range atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return false, err
Expand Down
59 changes: 52 additions & 7 deletions beacon-chain/operations/attestations/kv/aggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

c "github.com/patrickmn/go-cache"
"github.com/pkg/errors"
fssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
}),
AggregationBits: bitfield.Bitlist{0b10111},
},
wantErrString: "could not tree hash attestation: --.BeaconBlockRoot (" + fssz.ErrBytesLength.Error() + ")",
wantErrString: "could not create attestation ID",
},
{
name: "already seen",
Expand All @@ -92,15 +93,13 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
count: 1,
},
}
r, err := hashFn(util.HydrateAttestationData(&ethpb.AttestationData{
Slot: 100,
}))
id, err := attestation.NewId(util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 100}}), attestation.Data)
require.NoError(t, err)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewAttCaches()
cache.seenAtt.Set(string(r[:]), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
cache.seenAtt.Set(id.String(), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
assert.Equal(t, 0, len(cache.unAggregatedAtt), "Invalid start pool, atts: %d", len(cache.unAggregatedAtt))

err := cache.SaveAggregatedAttestation(tt.att)
Expand Down Expand Up @@ -230,7 +229,7 @@ func TestKV_Aggregated_DeleteAggregatedAttestation(t *testing.T) {
},
}
err := cache.DeleteAggregatedAttestation(att)
wantErr := "could not tree hash attestation data: --.BeaconBlockRoot (" + fssz.ErrBytesLength.Error() + ")"
wantErr := "could not create attestation ID"
assert.ErrorContains(t, wantErr, err)
})

Expand Down Expand Up @@ -500,3 +499,49 @@ func TestKV_Aggregated_DuplicateAggregatedAttestations(t *testing.T) {
assert.DeepSSZEqual(t, att2, returned[0], "Did not receive correct aggregated atts")
assert.Equal(t, 1, len(returned), "Did not receive correct aggregated atts")
}

func TestKV_Aggregated_AggregatedAttestationsBySlotIndex(t *testing.T) {
cache := NewAttCaches()

att1 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b1011}})
att2 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b1101}})
att3 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
atts := []*ethpb.Attestation{att1, att2, att3}

for _, att := range atts {
require.NoError(t, cache.SaveAggregatedAttestation(att))
}
ctx := context.Background()
returned := cache.AggregatedAttestationsBySlotIndex(ctx, 1, 1)
assert.DeepEqual(t, []*ethpb.Attestation{att1}, returned)
returned = cache.AggregatedAttestationsBySlotIndex(ctx, 1, 2)
assert.DeepEqual(t, []*ethpb.Attestation{att2}, returned)
returned = cache.AggregatedAttestationsBySlotIndex(ctx, 2, 1)
assert.DeepEqual(t, []*ethpb.Attestation{att3}, returned)
}

func TestKV_Aggregated_AggregatedAttestationsBySlotIndexElectra(t *testing.T) {
cache := NewAttCaches()

committeeBits := primitives.NewAttestationCommitteeBits()
committeeBits.SetBitAt(1, true)
att1 := util.HydrateAttestationElectra(&ethpb.AttestationElectra{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1011}, CommitteeBits: committeeBits})
committeeBits = primitives.NewAttestationCommitteeBits()
committeeBits.SetBitAt(2, true)
att2 := util.HydrateAttestationElectra(&ethpb.AttestationElectra{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}, CommitteeBits: committeeBits})
committeeBits = primitives.NewAttestationCommitteeBits()
committeeBits.SetBitAt(1, true)
att3 := util.HydrateAttestationElectra(&ethpb.AttestationElectra{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}, CommitteeBits: committeeBits})
atts := []*ethpb.AttestationElectra{att1, att2, att3}

for _, att := range atts {
require.NoError(t, cache.SaveAggregatedAttestation(att))
}
ctx := context.Background()
returned := cache.AggregatedAttestationsBySlotIndexElectra(ctx, 1, 1)
assert.DeepEqual(t, []*ethpb.AttestationElectra{att1}, returned)
returned = cache.AggregatedAttestationsBySlotIndexElectra(ctx, 1, 2)
assert.DeepEqual(t, []*ethpb.AttestationElectra{att2}, returned)
returned = cache.AggregatedAttestationsBySlotIndexElectra(ctx, 2, 1)
assert.DeepEqual(t, []*ethpb.AttestationElectra{att3}, returned)
}
Loading

0 comments on commit b8aad84

Please sign in to comment.