Skip to content

Commit

Permalink
p2p: get rid of interface{} from PeerStore methods (#6101)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Aug 15, 2024
1 parent 7fc243c commit 3b9f3e3
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 59 deletions.
4 changes: 2 additions & 2 deletions network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT {
tmpdir := t.TempDir()
pk, err := GetPrivKey(cfg, tmpdir)
require.NoError(t, err)
ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{}, "")
ps, err := peerstore.NewPeerStore(nil, "")
require.NoError(t, err)
h, err := libp2p.New(
libp2p.ListenAddrStrings("/dns4/localhost/tcp/0"),
Expand Down Expand Up @@ -134,7 +134,7 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap
tmpdir := t.TempDir()
pk, err := GetPrivKey(cfg, tmpdir)
require.NoError(t, err)
ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{}, "")
ps, err := peerstore.NewPeerStore(nil, "")
require.NoError(t, err)
h, err := libp2p.New(
libp2p.ListenAddrStrings("/dns4/localhost/tcp/0"),
Expand Down
7 changes: 3 additions & 4 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error)
}

// MakeService creates a P2P service instance
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) {
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler) (*serviceImpl, error) {

sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService)
h.Network().Notify(sm)
Expand Down Expand Up @@ -238,16 +238,15 @@ func (s *serviceImpl) IDSigner() *PeerIDChallengeSigner {
// DialPeersUntilTargetCount attempts to establish connections to the provided phonebook addresses
func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) {
ps := s.host.Peerstore().(*pstore.PeerStore)
peerIDs := ps.GetAddresses(targetConnCount, phonebook.PhoneBookEntryRelayRole)
addrInfos := ps.GetAddresses(targetConnCount, phonebook.PhoneBookEntryRelayRole)
conns := s.host.Network().Conns()
var numOutgoingConns int
for _, conn := range conns {
if conn.Stat().Direction == network.DirOutbound {
numOutgoingConns++
}
}
for _, peerInfo := range peerIDs {
peerInfo := peerInfo.(*peer.AddrInfo)
for _, peerInfo := range addrInfos {
// if we are at our target count stop trying to connect
if numOutgoingConns >= targetConnCount {
return
Expand Down
40 changes: 16 additions & 24 deletions network/p2p/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"math/rand"
"time"

"github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-deadlock"
"github.com/libp2p/go-libp2p/core/peer"
libp2p "github.com/libp2p/go-libp2p/core/peerstore"
mempstore "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"golang.org/x/exp/slices"

"github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-deadlock"
)

// when using GetAddresses with getAllAddresses, all the addresses will be retrieved, regardless
Expand Down Expand Up @@ -76,14 +77,8 @@ func NewPeerStore(addrInfo []*peer.AddrInfo, network string) (*PeerStore, error)
return nil, fmt.Errorf("cannot initialize a peerstore: %w", err)
}

// initialize peerstore with addresses
peers := make([]interface{}, len(addrInfo))
for i := 0; i < len(addrInfo); i++ {
peers[i] = addrInfo[i]
}

pstore := &PeerStore{peerStoreCAB: ps}
pstore.AddPersistentPeers(peers, network, phonebook.PhoneBookEntryRelayRole)
pstore.AddPersistentPeers(addrInfo, network, phonebook.PhoneBookEntryRelayRole)
return pstore, nil
}

Expand All @@ -102,7 +97,7 @@ func MakePhonebook(connectionsRateLimitingCount uint,
}

// GetAddresses returns up to N addresses, but may return fewer
func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []interface{} {
func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo {
return shuffleSelect(ps.filterRetryTime(time.Now(), role), n)
}

Expand Down Expand Up @@ -210,7 +205,7 @@ func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime t
}

// ReplacePeerList replaces the peer list for the given networkName and role.
func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName string, role phonebook.PhoneBookEntryRoles) {
func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName string, role phonebook.PhoneBookEntryRoles) {
// prepare a map of items we'd like to remove.
removeItems := make(map[peer.ID]bool, 0)
peerIDs := ps.Peers()
Expand All @@ -226,8 +221,7 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName st
}

}
for _, addr := range addressesThey {
info := addr.(*peer.AddrInfo)
for _, info := range addressesThey {
data, _ := ps.Get(info.ID, addressDataKey)
if data != nil {
// we already have this.
Expand Down Expand Up @@ -255,17 +249,15 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName st

// AddPersistentPeers stores addresses of peers which are persistent.
// i.e. they won't be replaced by ReplacePeerList calls
func (ps *PeerStore) AddPersistentPeers(dnsAddresses []interface{}, networkName string, role phonebook.PhoneBookEntryRoles) {
for _, addr := range dnsAddresses {
info := addr.(*peer.AddrInfo)
func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName string, role phonebook.PhoneBookEntryRoles) {
for _, info := range addrInfo {
data, _ := ps.Get(info.ID, addressDataKey)
if data != nil {
// we already have this.
// Make sure the persistence field is set to true
ad := data.(addressData)
ad.persistent = true
_ = ps.Put(info.ID, addressDataKey, data)

_ = ps.Put(info.ID, addressDataKey, ad)
} else {
// we don't have this item. add it.
ps.AddAddrs(info.ID, info.Addrs, libp2p.PermanentAddrTTL)
Expand Down Expand Up @@ -328,8 +320,8 @@ func (ps *PeerStore) popNElements(n int, peerID peer.ID) {
_ = ps.Put(peerID, addressDataKey, ad)
}

func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []interface{} {
o := make([]interface{}, 0, len(ps.Peers()))
func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo {
o := make([]*peer.AddrInfo, 0, len(ps.Peers()))
for _, peerID := range ps.Peers() {
data, _ := ps.Get(peerID, addressDataKey)
if data != nil {
Expand All @@ -344,11 +336,11 @@ func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryR
return o
}

func shuffleSelect(set []interface{}, n int) []interface{} {
func shuffleSelect(set []*peer.AddrInfo, n int) []*peer.AddrInfo {
if n >= len(set) || n == getAllAddresses {
// return shuffled copy of everything
out := slices.Clone(set)
shuffleStrings(out)
shuffleAddrInfos(out)
return out
}
// Pick random indexes from the set
Expand All @@ -361,13 +353,13 @@ func shuffleSelect(set []interface{}, n int) []interface{} {
}
}
}
out := make([]interface{}, n)
out := make([]*peer.AddrInfo, n)
for i, index := range indexSample {
out[i] = set[index]
}
return out
}

func shuffleStrings(set []interface{}) {
func shuffleAddrInfos(set []*peer.AddrInfo) {
rand.Shuffle(len(set), func(i, j int) { set[i], set[j] = set[j], set[i] })
}
35 changes: 14 additions & 21 deletions network/p2p/peerstore/peerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ func TestPeerstore(t *testing.T) {

func testPhonebookAll(t *testing.T, set []*peer.AddrInfo, ph *PeerStore) {
actual := ph.GetAddresses(len(set), PhoneBookEntryRelayRole)
for _, got := range actual {
info := got.(*peer.AddrInfo)
for _, info := range actual {
ok := false
for _, known := range set {
if info.ID == known.ID {
Expand All @@ -101,13 +100,12 @@ func testPhonebookAll(t *testing.T, set []*peer.AddrInfo, ph *PeerStore) {
}
}
if !ok {
t.Errorf("get returned junk %#v", got)
t.Errorf("get returned junk %#v", info)
}
}
for _, known := range set {
ok := false
for _, got := range actual {
info := got.(*peer.AddrInfo)
for _, info := range actual {
if info.ID == known.ID {
ok = true
break
Expand All @@ -128,8 +126,7 @@ func testPhonebookUniform(t *testing.T, set []*peer.AddrInfo, ph *PeerStore, get
}
for i := 0; i < uniformityTestLength; i++ {
actual := ph.GetAddresses(getsize, PhoneBookEntryRelayRole)
for _, xa := range actual {
info := xa.(*peer.AddrInfo)
for _, info := range actual {
if _, ok := counts[info.ID.String()]; ok {
counts[info.ID.String()]++
}
Expand Down Expand Up @@ -226,11 +223,11 @@ func TestMultiPhonebook(t *testing.T) {
require.NoError(t, err)
infoSet = append(infoSet, info)
}
pha := make([]interface{}, 0)
pha := make([]*peer.AddrInfo, 0)
for _, e := range infoSet[:5] {
pha = append(pha, e)
}
phb := make([]interface{}, 0)
phb := make([]*peer.AddrInfo, 0)
for _, e := range infoSet[5:] {
phb = append(phb, e)
}
Expand All @@ -252,7 +249,7 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) {

info, err := peerInfoFromDomainPort("a:4041")
require.NoError(t, err)
persistentPeers := []interface{}{info}
persistentPeers := []*peer.AddrInfo{info}
set := []string{"b:4042", "c:4043", "d:4044", "e:4045", "f:4046", "g:4047", "h:4048", "i:4049", "j:4010"}
infoSet := make([]*peer.AddrInfo, 0)
for _, addr := range set {
Expand All @@ -261,11 +258,11 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) {
infoSet = append(infoSet, info)
}

pha := make([]interface{}, 0)
pha := make([]*peer.AddrInfo, 0)
for _, e := range infoSet[:5] {
pha = append(pha, e)
}
phb := make([]interface{}, 0)
phb := make([]*peer.AddrInfo, 0)
for _, e := range infoSet[5:] {
phb = append(phb, e)
}
Expand All @@ -279,10 +276,8 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) {
testPhonebookAll(t, append(infoSet, info), ph)
allAddresses := ph.GetAddresses(len(set)+len(persistentPeers), PhoneBookEntryRelayRole)
for _, pp := range persistentPeers {
pp := pp.(*peer.AddrInfo)
found := false
for _, addr := range allAddresses {
addr := addr.(*peer.AddrInfo)
if addr.ID == pp.ID {
found = true
break
Expand All @@ -303,11 +298,11 @@ func TestMultiPhonebookDuplicateFiltering(t *testing.T) {
infoSet = append(infoSet, info)
}

pha := make([]interface{}, 0)
pha := make([]*peer.AddrInfo, 0)
for _, e := range infoSet[:7] {
pha = append(pha, e)
}
phb := make([]interface{}, 0)
phb := make([]*peer.AddrInfo, 0)
for _, e := range infoSet[3:] {
phb = append(phb, e)
}
Expand Down Expand Up @@ -343,7 +338,7 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) {

// Test the addresses are populated in the phonebook and a
// time can be added to one of them
entries.ReplacePeerList([]interface{}{info1, info2}, "default", PhoneBookEntryRelayRole)
entries.ReplacePeerList([]*peer.AddrInfo{info1, info2}, "default", PhoneBookEntryRelayRole)
addrInPhonebook, waitTime, provisionalTime := entries.GetConnectionWaitTime(string(info1.ID))
require.Equal(t, true, addrInPhonebook)
require.Equal(t, time.Duration(0), waitTime)
Expand Down Expand Up @@ -458,14 +453,14 @@ func TestPhonebookRoles(t *testing.T) {
relaysSet := []string{"relay1:4040", "relay2:4041", "relay3:4042"}
archiverSet := []string{"archiver1:1111", "archiver2:1112", "archiver3:1113"}

infoRelaySet := make([]interface{}, 0)
infoRelaySet := make([]*peer.AddrInfo, 0)
for _, addr := range relaysSet {
info, err := peerInfoFromDomainPort(addr)
require.NoError(t, err)
infoRelaySet = append(infoRelaySet, info)
}

infoArchiverSet := make([]interface{}, 0)
infoArchiverSet := make([]*peer.AddrInfo, 0)
for _, addr := range archiverSet {
info, err := peerInfoFromDomainPort(addr)
require.NoError(t, err)
Expand All @@ -485,12 +480,10 @@ func TestPhonebookRoles(t *testing.T) {
entries := ph.GetAddresses(l, role)
if role == PhoneBookEntryRelayRole {
for _, entry := range entries {
entry := entry.(*peer.AddrInfo)
require.Contains(t, string(entry.ID), "relay")
}
} else if role == PhoneBookEntryArchiverRole {
for _, entry := range entries {
entry := entry.(*peer.AddrInfo)
require.Contains(t, string(entry.ID), "archiver")
}
}
Expand Down
17 changes: 11 additions & 6 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,21 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
}
log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs())

net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler, addrInfo)
net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler)
if err != nil {
return nil, err
}

peerIDs := pstore.Peers()
addrInfos := make([]*peer.AddrInfo, 0, len(peerIDs))
for _, peerID := range peerIDs {
addrInfo := pstore.PeerInfo(peerID)
addrInfos = append(addrInfos, &addrInfo)
}
bootstrapper := &bootstrapper{
cfg: cfg,
networkID: networkID,
phonebookPeers: addrInfo,
phonebookPeers: addrInfos,
resolveController: dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecurityTXTEnforced(), ""),
log: net.log,
}
Expand Down Expand Up @@ -426,7 +432,7 @@ func (n *P2PNetwork) meshThreadInner() int {
}

peers := mergeP2PAddrInfoResolvedAddresses(dnsPeers, dhtPeers)
replace := make([]interface{}, 0, len(peers))
replace := make([]*peer.AddrInfo, 0, len(peers))
for i := range peers {
replace = append(replace, &peers[i])
}
Expand Down Expand Up @@ -631,9 +637,8 @@ func (n *P2PNetwork) GetPeers(options ...PeerOption) []Peer {
n.wsPeersLock.RUnlock()
case PeersPhonebookRelays:
const maxNodes = 100
peerIDs := n.pstore.GetAddresses(maxNodes, phonebook.PhoneBookEntryRelayRole)
for _, peerInfo := range peerIDs {
peerInfo := peerInfo.(*peer.AddrInfo)
addrInfos := n.pstore.GetAddresses(maxNodes, phonebook.PhoneBookEntryRelayRole)
for _, peerInfo := range addrInfos {
if peerCore, ok := addrInfoToWsPeerCore(n, peerInfo); ok {
peers = append(peers, &peerCore)
}
Expand Down
2 changes: 1 addition & 1 deletion network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ func TestP2PHTTPHandler(t *testing.T) {
// zero clients allowed, rate limiting window (10s) is greater than queue deadline (1s)
pstore, err := peerstore.MakePhonebook(0, 10*time.Second)
require.NoError(t, err)
pstore.AddPersistentPeers([]interface{}{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole)
pstore.AddPersistentPeers([]*peer.AddrInfo{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole)
httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, pstore, 1*time.Second, 1)
require.NoError(t, err)
_, err = httpClient.Get("/test")
Expand Down
2 changes: 1 addition & 1 deletion network/phonebook/phonebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (e *phonebookImpl) AddPersistentPeers(dnsAddresses []string, networkName st
// we already have this.
// Make sure the persistence field is set to true
pbData.persistent = true

e.data[addr] = pbData
} else {
// we don't have this item. add it.
e.data[addr] = makePhonebookEntryData(networkName, role, true)
Expand Down

0 comments on commit 3b9f3e3

Please sign in to comment.