From 2d92c530ad78bbb1aeb12937f77098bc1180c210 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 5 Dec 2024 17:20:29 -0500 Subject: [PATCH] Refactor subnets subscriptions. --- CHANGELOG.md | 1 + beacon-chain/sync/subscriber.go | 343 ++++++++++----------------- beacon-chain/sync/subscriber_test.go | 12 +- 3 files changed, 127 insertions(+), 229 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ef632908876..78944ffc4985 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve - Increase Max Payload Size in Gossip. - Revert "Proposer checks gas limit before accepting builder's bid". - Updated quic-go to v0.48.2 . +- Refactor subnets subscriptions. ### Deprecated diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index baca434644cc..b3c11716414c 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -53,6 +53,26 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag return pubsub.ValidationAccept, nil } +func sliceFromCount(count uint64) []uint64 { + result := make([]uint64, 0, count) + + for item := range count { + result = append(result, item) + } + + return result +} + +func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64 { + // Get the current epoch. + currentEpoch := slots.ToEpoch(currentSlot) + + // Retrieve the subnets we want to subscribe to. + subs := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch) + + return slice.SetUint64(subs) +} + // Register PubSub subscribers func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.subscribe( @@ -87,18 +107,23 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { ) if flags.Get().SubscribeToAllSubnets { s.subscribeStaticWithSubnets( + "Attestation", p2p.AttestationSubnetTopicFormat, s.validateCommitteeIndexBeaconAttestation, /* validator */ s.committeeIndexBeaconAttestationSubscriber, /* message handler */ digest, - params.BeaconConfig().AttestationSubnetCount, + sliceFromCount(params.BeaconConfig().AttestationSubnetCount), ) } else { s.subscribeDynamicWithSubnets( + "Attestation", + p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})], p2p.AttestationSubnetTopicFormat, s.validateCommitteeIndexBeaconAttestation, /* validator */ s.committeeIndexBeaconAttestationSubscriber, /* message handler */ digest, + s.persistentAndAggregatorSubnetIndices, + s.attesterSubnetIndices, ) } // Altair Fork Version @@ -109,19 +134,26 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.syncContributionAndProofSubscriber, digest, ) + if flags.Get().SubscribeToAllSubnets { - s.subscribeStaticWithSyncSubnets( + s.subscribeStaticWithSubnets( + "Sync committee", p2p.SyncCommitteeSubnetTopicFormat, s.validateSyncCommitteeMessage, /* validator */ s.syncCommitteeMessageSubscriber, /* message handler */ digest, + sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount), ) } else { - s.subscribeDynamicWithSyncSubnets( + s.subscribeDynamicWithSubnets( + "Sync committee", + p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})], p2p.SyncCommitteeSubnetTopicFormat, s.validateSyncCommitteeMessage, /* validator */ s.syncCommitteeMessageSubscriber, /* message handler */ digest, + s.activeSyncSubnetIndices, + func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, ) } } @@ -139,11 +171,12 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { // New Gossip Topic in Deneb if epoch >= params.BeaconConfig().DenebForkEpoch { s.subscribeStaticWithSubnets( + "Blob", p2p.BlobSubnetTopicFormat, s.validateBlob, /* validator */ s.blobSubscriber, /* message handler */ digest, - params.BeaconConfig().BlobsidecarSubnetCount, + sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount), ) } } @@ -324,24 +357,43 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p } } -// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is +// subscribeStaticWithSubnets with the given topic and index. A given validator and subscription handler is // used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. -func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) +func (s *Service) subscribeStaticWithSubnets( + description string, + topic string, + validator wrappedVal, + handle subHandler, + digest [4]byte, + subnetIndices []uint64, +) { + // Retrieve the genesis validators root. + genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() + + // Retrieve the fork data + _, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) if err != nil { // Impossible condition as it would mean digest does not exist. panic(err) } - base := p2p.GossipTopicMappings(topic, e) + + // Retrieve the base protobuf message. + base := p2p.GossipTopicMappings(topic, epoch) if base == nil { // Impossible condition as it would mean topic does not exist. panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) } - for i := uint64(0); i < subnetCount; i++ { - s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle) + + // Subscribe to the subnets. + for _, subnetIndex := range subnetIndices { + fullTopic := s.addDigestAndIndexToTopic(topic, digest, subnetIndex) + s.subscribeWithBase(fullTopic, validator, handle) } + + // Retrieve the genesis time. genesis := s.cfg.clock.GenesisTime() + + // Create a ticker ticking every slot. ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) go func() { @@ -350,100 +402,48 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, case <-s.ctx.Done(): ticker.Done() return + case <-ticker.C(): + // Do not subscribe if not synced. if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { continue } - valid, err := isDigestValid(digest, genesis, genRoot) + + valid, err := isDigestValid(digest, genesis, genesisValidatorsRoot) if err != nil { log.Error(err) continue } + if !valid { - log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) // Unsubscribes from all our current subnets. - for i := uint64(0); i < subnetCount; i++ { - fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix() + log.WithField("digest", digest).Infof("%s subnets with are no longer valid, unsubscribing from all of them.", description) + for _, subnetIndex := range subnetIndices { + fullTopic := fmt.Sprintf(topic, digest, subnetIndex) + s.cfg.p2p.Encoding().ProtocolSuffix() s.unSubscribeFromTopic(fullTopic) } - ticker.Done() - return - } - // Check every slot that there are enough peers - for i := uint64(0); i < subnetCount; i++ { - if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) { - _, err := s.cfg.p2p.FindPeersWithSubnet( - s.ctx, - s.addDigestAndIndexToTopic(topic, digest, i), - i, - flags.Get().MinimumPeersPerSubnet, - ) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - return - } - } - } - } - } - }() -} - -// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible -// string for the topic name and the list of subnets for subscribed topics that should be -// maintained. -func (s *Service) subscribeDynamicWithSubnets( - topicFormat string, - validate wrappedVal, - handle subHandler, - digest [4]byte, -) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) - if err != nil { - // Impossible condition as it would mean digest does not exist. - panic(err) - } - base := p2p.GossipTopicMappings(topicFormat, e) - if base == nil { - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) - } - subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot) - genesis := s.cfg.clock.GenesisTime() - ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) - go func() { - for { - select { - case <-s.ctx.Done(): - ticker.Done() - return - case currentSlot := <-ticker.C(): - if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - continue - } - valid, err := isDigestValid(digest, genesis, genRoot) - if err != nil { - log.Error(err) - continue - } - if !valid { - log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) - // Unsubscribes from all our current subnets. - s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) ticker.Done() return } - wantedSubs := s.retrievePersistentSubs(currentSlot) - s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) - for _, idx := range wantedSubs { - s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle) - } - // find desired subs for attesters - attesterSubs := s.attesterSubnetIndices(currentSlot) - for _, idx := range attesterSubs { - s.lookupAttesterSubnets(digest, idx) + for _, subnetIndex := range subnetIndices { + // Check that there are enough peers. + fullTopic := s.addDigestAndIndexToTopic(topic, digest, subnetIndex) + if s.enoughPeersAreConnected(fullTopic) { + continue + } + + // If not enough peers, search for more. + if _, err := s.cfg.p2p.FindPeersWithSubnet( + s.ctx, + s.addDigestAndIndexToTopic(topic, digest, subnetIndex), + subnetIndex, + flags.Get().MinimumPeersPerSubnet, + ); err != nil { + log.WithError(err).Debug("Could not search for peers") + return + } } } } @@ -477,96 +477,11 @@ func (s *Service) reValidateSubscriptions( } } -// subscribe missing subnets for our aggregators. -func (s *Service) subscribeAggregatorSubnet( - subscriptions map[uint64]*pubsub.Subscription, - idx uint64, - digest [4]byte, - validate wrappedVal, - handle subHandler, -) { - // do not subscribe if we have no peers in the same - // subnet - topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] - subnetTopic := fmt.Sprintf(topic, digest, idx) - // check if subscription exists and if not subscribe the relevant subnet. - if _, exists := subscriptions[idx]; !exists { - subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) - } - if !s.enoughPeersAreConnected(subnetTopic) { - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - } - } -} - -// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is -// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. -func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) - if err != nil { - panic(err) - } - base := p2p.GossipTopicMappings(topic, e) - if base == nil { - panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) - } - for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { - s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle) - } - genesis := s.cfg.clock.GenesisTime() - ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) - - go func() { - for { - select { - case <-s.ctx.Done(): - ticker.Done() - return - case <-ticker.C(): - if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - continue - } - valid, err := isDigestValid(digest, genesis, genRoot) - if err != nil { - log.Error(err) - continue - } - if !valid { - log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) - // Unsubscribes from all our current subnets. - for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { - fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix() - s.unSubscribeFromTopic(fullTopic) - } - ticker.Done() - return - } - // Check every slot that there are enough peers - for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { - if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) { - _, err := s.cfg.p2p.FindPeersWithSubnet( - s.ctx, - s.addDigestAndIndexToTopic(topic, digest, i), - i, - flags.Get().MinimumPeersPerSubnet, - ) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - return - } - } - } - } - } - }() -} - -// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed. +// subscribeToSubnets subscribes to needed subnets, unsubscribe from unneeded ones and search for more peers if needed. // Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. -func (s *Service) subscribeToSyncSubnets( +func (s *Service) subscribeToSubnets( + description string, + topic string, topicFormat string, digest [4]byte, genesisValidatorsRoot [fieldparams.RootLength]byte, @@ -575,10 +490,9 @@ func (s *Service) subscribeToSyncSubnets( currentSlot primitives.Slot, validate wrappedVal, handle subHandler, + getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64, + getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64, ) bool { - // Get sync subnets topic. - topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] - // Do not subscribe if not synced. if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { return true @@ -593,22 +507,25 @@ func (s *Service) subscribeToSyncSubnets( // Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. if !valid { - log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.") + log.WithField("digest", fmt.Sprintf("%#x", digest)).Warningf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", description) s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) return false } - // Get the current epoch. - currentEpoch := slots.ToEpoch(currentSlot) - // Retrieve the subnets we want to subscribe to. - wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch) + subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot) + + // Retrieve the subnets we want to find peers for. + subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot) + + // Combine the subnets to subscribe and the subnets to find peers for. + subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...)) // Remove subscriptions that are no longer wanted. - s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest) + s.reValidateSubscriptions(subscriptions, subnetsToSubscribeIndex, topicFormat, digest) // Subscribe to wanted subnets. - for _, subnetIndex := range wantedSubnetsIndex { + for _, subnetIndex := range subnetsToSubscribeIndex { subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) // Check if subscription exists. @@ -622,7 +539,7 @@ func (s *Service) subscribeToSyncSubnets( } // Find new peers for wanted subnets if needed. - for _, subnetIndex := range wantedSubnetsIndex { + for _, subnetIndex := range subnetsToFindPeersIndex { subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) // Check if we have enough peers in the subnet. Skip if we do. @@ -640,18 +557,19 @@ func (s *Service) subscribeToSyncSubnets( return true } -// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets. -func (s *Service) subscribeDynamicWithSyncSubnets( +// subscribeDynamicWithSubnets subscribes to a dynamically changing list of subnets. +func (s *Service) subscribeDynamicWithSubnets( + description string, + topic string, topicFormat string, validate wrappedVal, handle subHandler, digest [4]byte, + getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64, + getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64, ) { - // Retrieve the number of committee subnets we need to subscribe to. - syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount - // Initialize the subscriptions map. - subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount) + subscriptions := make(map[uint64]*pubsub.Subscription) // Retrieve the genesis validators root. genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() @@ -679,13 +597,12 @@ func (s *Service) subscribeDynamicWithSyncSubnets( currentSlot := s.cfg.clock.CurrentSlot() go func() { - // Subscribe to the sync subnets. - s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) + s.subscribeToSubnets(description, topic, topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly) for { select { case currentSlot := <-ticker.C(): - isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) + isDigestValid := s.subscribeToSubnets(description, topic, topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly) // Stop the ticker if the digest is not valid. Likely to happen after a hard fork. if !isDigestValid { @@ -701,19 +618,6 @@ func (s *Service) subscribeDynamicWithSyncSubnets( }() } -// lookup peers for attester specific subnets. -func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { - topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] - subnetTopic := fmt.Sprintf(topic, digest, idx) - if !s.enoughPeersAreConnected(subnetTopic) { - // perform a search for peers with the desired committee index. - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - } - } -} - func (s *Service) unSubscribeFromTopic(topic string) { log.WithField("topic", topic).Debug("Unsubscribing from topic") if err := s.cfg.p2p.PubSub().UnregisterTopicValidator(topic); err != nil { @@ -740,19 +644,12 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool { return peersWithSubnetCount >= threshold } -func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 { - // Persistent subscriptions from validators - persistentSubs := s.persistentSubnetIndices() - // Update desired topic indices for aggregator - wantedSubs := s.aggregatorSubnetIndices(currSlot) - - // Combine subscriptions to get all requested subscriptions - return slice.SetUint64(append(persistentSubs, wantedSubs...)) -} +func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { + persistentSubnetIndices := s.persistentSubnetIndices() + aggregatorSubnetIndices := s.aggregatorSubnetIndices(currentSlot) -func (*Service) retrieveActiveSyncSubnets(currEpoch primitives.Epoch) []uint64 { - subs := cache.SyncSubnetIDs.GetAllSubnets(currEpoch) - return slice.SetUint64(subs) + // Combine subscriptions to get all requested subscriptions. + return slice.SetUint64(append(persistentSubnetIndices, aggregatorSubnetIndices...)) } // filters out required peers for the node to function, not @@ -768,7 +665,7 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { return pids } currSlot := s.cfg.clock.CurrentSlot() - wantedSubs := s.retrievePersistentSubs(currSlot) + wantedSubs := s.persistentAndAggregatorSubnetIndices(currSlot) wantedSubs = slice.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...)) topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index d0bae7fe4aa9..9f7efcbfe761 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -332,10 +332,10 @@ func TestStaticSubnets(t *testing.T) { defaultTopic := "/eth2/%x/beacon_attestation_%d" d, err := r.currentForkDigest() assert.NoError(t, err) - r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { + r.subscribeStaticWithSubnets("Attestation", defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { // no-op return nil - }, d, params.BeaconConfig().AttestationSubnetCount) + }, d, sliceFromCount(params.BeaconConfig().AttestationSubnetCount)) topics := r.cfg.p2p.PubSub().GetTopics() if uint64(len(topics)) != params.BeaconConfig().AttestationSubnetCount { t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconConfig().AttestationSubnetCount, len(topics)) @@ -565,7 +565,7 @@ func TestSubscribeWithSyncSubnets_StaticOK(t *testing.T) { defer cache.SyncSubnetIDs.EmptyAllCaches() digest, err := r.currentForkDigest() assert.NoError(t, err) - r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) + r.subscribeStaticWithSubnets("Sync committee", p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)) assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics())) cancel() } @@ -600,7 +600,7 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) { cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second) digest, err := r.currentForkDigest() assert.NoError(t, err) - r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) + r.subscribeDynamicWithSubnets("Sync committee", p2p.GossipTypeMapping[reflect.TypeOf(&pb.SyncCommitteeMessage{})], p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }) time.Sleep(2 * time.Second) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{} @@ -645,7 +645,7 @@ func TestSubscribeWithSyncSubnets_StaticSwitchFork(t *testing.T) { genRoot := r.cfg.clock.GenesisValidatorsRoot() digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) assert.NoError(t, err) - r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) + r.subscribeStaticWithSubnets("Sync committee", p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)) assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics())) // Expect that all old topics will be unsubscribed. @@ -689,7 +689,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) { digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) assert.NoError(t, err) - r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) + r.subscribeDynamicWithSubnets("Sync committee", p2p.GossipTypeMapping[reflect.TypeOf(&pb.SyncCommitteeMessage{})], p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} }) time.Sleep(2 * time.Second) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) topicMap := map[string]bool{}