Skip to content

Commit

Permalink
Add p2p network nodes test with DHT
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Dec 13, 2023
1 parent 864da40 commit 0233a2d
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 9 deletions.
14 changes: 13 additions & 1 deletion network/p2p/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func (c *CapabilitiesDiscovery) PeersForCapability(capability Capability, n int)
ctx, cancel := context.WithTimeout(context.Background(), operationTimeout)
defer cancel()
var peers []peer.AddrInfo
peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n))
// +1 because it can include self but we exclude self from the returned list
// that might confuse the caller (and tests assertions)
peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n+1))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -146,6 +148,16 @@ func (c *CapabilitiesDiscovery) AdvertiseCapabilities(capabilities ...Capability
}()
}

// Sizer exposes the Size method
type Sizer interface {
Size() int
}

// RoutingTable exposes some knowledge about the DHT routing table
func (c *CapabilitiesDiscovery) RoutingTable() Sizer {
return c.dht.RoutingTable()

Check warning on line 158 in network/p2p/capabilities.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/capabilities.go#L157-L158

Added lines #L157 - L158 were not covered by tests
}

// MakeCapabilitiesDiscovery creates a new CapabilitiesDiscovery object which exposes peer discovery and capabilities advertisement
func MakeCapabilitiesDiscovery(ctx context.Context, cfg config.Local, h host.Host, networkID protocol.NetworkID, log logging.Logger, bootstrapFunc func() []peer.AddrInfo) (*CapabilitiesDiscovery, error) {
discDht, err := algoDht.MakeDHT(ctx, h, networkID, cfg, bootstrapFunc)
Expand Down
2 changes: 1 addition & 1 deletion network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func TestCapabilities_ExcludesSelf(t *testing.T) {
disc := setupCapDiscovery(t, 2, 2)

testPeersFound := func(disc *CapabilitiesDiscovery, n int, cap Capability) bool {
peers, err := disc.PeersForCapability(cap, n+1)
peers, err := disc.PeersForCapability(cap, n)
if err == nil && len(peers) == n {
return true
}
Expand Down
4 changes: 2 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
}

if cfg.EnableDHTProviders {
caps, err0 := p2p.MakeCapabilitiesDiscovery(net.ctx, cfg, h, networkID, net.log, bootstrapper.BootstrapFunc)
disc, err0 := p2p.MakeCapabilitiesDiscovery(net.ctx, cfg, h, networkID, net.log, bootstrapper.BootstrapFunc)
if err0 != nil {
log.Errorf("Failed to create dht node capabilities discovery: %v", err)
return nil, err

Check warning on line 203 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L202-L203

Added lines #L202 - L203 were not covered by tests
}
net.capabilitiesDiscovery = caps
net.capabilitiesDiscovery = disc
}

err = net.setup()
Expand Down
157 changes: 152 additions & 5 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package network
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -316,15 +317,26 @@ func TestP2PNetworkAddress(t *testing.T) {
require.Empty(t, retAddr)
}

func TestGetBootstrapPeers(t *testing.T) {
func TestBootstrapFunc(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "<network>.algodev.network"
cfg.DNSSecurityFlags = 0
b := bootstrapper{}
require.Nil(t, b.BootstrapFunc())

addrs := getBootstrapPeers(cfg, "test")
b.started = true
p := peer.AddrInfo{ID: "test"}
b.phonebookPeers = []*peer.AddrInfo{&p}
require.Equal(t, []peer.AddrInfo{p}, b.BootstrapFunc())

b.phonebookPeers = nil

b.cfg = config.GetDefaultLocal()
b.cfg.DNSBootstrapID = "<network>.algodev.network"
b.cfg.DNSSecurityFlags = 0
b.networkID = "test"

addrs := b.BootstrapFunc()

require.GreaterOrEqual(t, len(addrs), 1)
addr := addrs[0]
Expand Down Expand Up @@ -357,3 +369,138 @@ func TestGetBootstrapPeersInvalidAddr(t *testing.T) {

require.Equal(t, 0, len(addrs))
}

type capNodeInfo struct {
nopeNodeInfo
cap p2p.Capability
}

func (ni *capNodeInfo) Capabilities() []p2p.Capability {
return []p2p.Capability{ni.cap}
}

func waitForRouting(t *testing.T, disc *p2p.CapabilitiesDiscovery) {
refreshCtx, refCancel := context.WithTimeout(context.Background(), time.Second*5)
for {
select {
case <-refreshCtx.Done():
refCancel()
require.Fail(t, "failed to populate routing table before timeout")
default:
if disc.RoutingTable().Size() > 0 {
refCancel()
return
}
}
time.Sleep(50 * time.Millisecond)
}
}

// TestP2PNetworkDHTCapabilities runs nodes with capabilites and ensures that connected nodes
// can discover themself. The other nodes receive the first node in bootstrap list before starting.
// There is two variations of the test: only netA advertises capabilities, and all nodes advertise.
func TestP2PNetworkDHTCapabilities(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.EnableDHTProviders = true
log := logging.TestingLog(t)

cap := p2p.Archival
tests := []struct {
name string
nis []NodeInfo
numCapPeers int
}{
{"cap=all", []NodeInfo{&capNodeInfo{cap: cap}, &capNodeInfo{cap: cap}, &capNodeInfo{cap: cap}}, 2}, // each has 2 peers with capabilities
{"cap=netA", []NodeInfo{&capNodeInfo{cap: cap}, &nopeNodeInfo{}, &nopeNodeInfo{}}, 1}, // each has 1 peer with capabilities
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, test.nis[0])
require.NoError(t, err)

err = netA.Start()
require.NoError(t, err)
defer netA.Stop()

peerInfoA := netA.service.AddrInfo()
addrsA, err := peerstore.AddrInfoToP2pAddrs(&peerInfoA)
require.NoError(t, err)
require.NotZero(t, addrsA[0])

multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}
netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[1])
require.NoError(t, err)
err = netB.Start()
require.NoError(t, err)
defer netB.Stop()

netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[2])
require.NoError(t, err)
err = netC.Start()
require.NoError(t, err)
defer netC.Stop()

require.Eventually(
t,
func() bool {
return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 &&
len(netC.service.ListPeersForTopic(p2p.TXTopicName)) > 0
},
2*time.Second,
50*time.Millisecond,
)
t.Logf("peers connected")

discs := []*p2p.CapabilitiesDiscovery{netA.capabilitiesDiscovery, netB.capabilitiesDiscovery, netC.capabilitiesDiscovery}

var wg sync.WaitGroup
wg.Add(len(discs))
for _, disc := range discs {
if disc == nil {
wg.Done()
continue
}
go func(disc *p2p.CapabilitiesDiscovery) {
defer wg.Done()
waitForRouting(t, disc)
}(disc)
}
wg.Wait()

t.Logf("DHT is ready")

// ensure all peers are connected
for _, disc := range discs {
require.Equal(t, 2, len(disc.Host().Network().Peers()))
}

wg.Add(len(discs))
for _, disc := range discs {
go func(disc *p2p.CapabilitiesDiscovery) {
defer wg.Done()
if disc == netA.capabilitiesDiscovery {
return
}
require.Eventuallyf(t,
func() bool {
peers, err := disc.PeersForCapability(cap, test.numCapPeers)
if err == nil && len(peers) == test.numCapPeers {
return true
}
return false
},
time.Minute,
time.Second,
fmt.Sprintf("Not all expected %s cap peers were found", cap),
)
}(disc)
}
wg.Wait()
})
}
}

0 comments on commit 0233a2d

Please sign in to comment.