Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: start p2p networking and DHT ops when starting services, not when instantiating them #5867

Merged
merged 7 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (n *NetworkFacade) PushDownstreamMessage(newMsg context.CancelFunc) bool {
func (n *NetworkFacade) Address() (string, bool) { return "mock network", true }

// Start - unused function
func (n *NetworkFacade) Start() {}
func (n *NetworkFacade) Start() error { return nil }

// Stop - unused function
func (n *NetworkFacade) Stop() {}
Expand Down
4 changes: 2 additions & 2 deletions agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn
return nil
}

func (w *whiteholeNetwork) Start() {
func (w *whiteholeNetwork) Start() error {
w.quit = make(chan struct{})
go func(w *whiteholeNetwork) {
w.domain.messagesMu.Lock()
Expand Down Expand Up @@ -216,7 +216,7 @@ func (w *whiteholeNetwork) Start() {
atomic.AddUint32(&w.lastMsgRead, 1)
}
}(w)
return
return nil
}
func (w *whiteholeNetwork) getMux() *network.Multiplexer {
return w.mux
Expand Down
3 changes: 2 additions & 1 deletion components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func (network *MockNetwork) Address() (string, bool) {
}

// Start - unused function
func (network *MockNetwork) Start() {
func (network *MockNetwork) Start() error {
return nil
}

// Stop - unused function
Expand Down
11 changes: 8 additions & 3 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
type ServerNode interface {
apiServer.APINodeInterface
ListeningAddress() (string, bool)
Start()
Start() error
Stop()
}

Expand Down Expand Up @@ -272,7 +272,13 @@
func (s *Server) Start() {
s.log.Info("Trying to start an Algorand node")
fmt.Print("Initializing the Algorand node... ")
s.node.Start()
err := s.node.Start()
if err != nil {
msg := fmt.Sprintf(("Failed to start alg Algorand node: %v"), err)
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
s.log.Error(msg)
fmt.Println(msg)
os.Exit(1)

Check warning on line 280 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L275-L280

Added lines #L275 - L280 were not covered by tests
}
s.log.Info("Successfully started an Algorand node.")
fmt.Println("Success!")

Expand All @@ -291,7 +297,6 @@
}

var apiToken string
var err error
fmt.Printf("API authentication disabled: %v\n", cfg.DisableAPIAuth)
if !cfg.DisableAPIAuth {
apiToken, err = tokens.GetAndValidateAPIToken(s.RootPath, tokens.AlgodTokenFilename)
Expand Down
2 changes: 1 addition & 1 deletion network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type GossipNode interface {
GetPeers(options ...PeerOption) []Peer

// Start threads, listen on sockets.
Start()
Start() error

// Close sockets. Stop threads.
Stop()
Expand Down
13 changes: 6 additions & 7 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
// supply alternate NetAddress for P2P network
p2pcfg := cfg
p2pcfg.NetAddress = cfg.P2PListenAddress
p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID)
p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo)

Check warning on line 45 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L45

Added line #L45 was not covered by tests
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,18 +153,17 @@
}

// Start implements GossipNode
func (n *HybridP2PNetwork) Start() {
_ = n.runParallel(func(net GossipNode) error {
net.Start()
return nil
func (n *HybridP2PNetwork) Start() error {
err := n.runParallel(func(net GossipNode) error {
return net.Start()

Check warning on line 158 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L156-L158

Added lines #L156 - L158 were not covered by tests
})
return err

Check warning on line 160 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L160

Added line #L160 was not covered by tests
}

// Stop implements GossipNode
func (n *HybridP2PNetwork) Stop() {
_ = n.runParallel(func(net GossipNode) error {
net.Start()
return nil
return net.Start()

Check warning on line 166 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L166

Added line #L166 was not covered by tests
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
})
}

Expand Down
28 changes: 16 additions & 12 deletions network/p2p/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
algoDht "github.com/algorand/go-algorand/network/p2p/dht"
"github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/protocol"
)

// Capability represents functions that some nodes may provide and other nodes would want to know about
Expand Down Expand Up @@ -88,7 +88,9 @@
ctx, cancel := context.WithTimeout(context.Background(), operationTimeout)
defer cancel()
var peers []peer.AddrInfo
peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n))
// +1 because it can include self but we exclude self from the returned list
// that might confuse the caller (and tests assertions)
peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n+1))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -146,17 +148,19 @@
}()
}

// Sizer exposes the Size method
type Sizer interface {
Size() int
}

// RoutingTable exposes some knowledge about the DHT routing table
func (c *CapabilitiesDiscovery) RoutingTable() Sizer {
return c.dht.RoutingTable()

Check warning on line 158 in network/p2p/capabilities.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/capabilities.go#L157-L158

Added lines #L157 - L158 were not covered by tests
}

// MakeCapabilitiesDiscovery creates a new CapabilitiesDiscovery object which exposes peer discovery and capabilities advertisement
func MakeCapabilitiesDiscovery(ctx context.Context, cfg config.Local, datadir string, network string, log logging.Logger, bootstrapPeers []*peer.AddrInfo) (*CapabilitiesDiscovery, error) {
pstore, err := peerstore.NewPeerStore(bootstrapPeers)
if err != nil {
return nil, err
}
h, err := makeHost(cfg, datadir, pstore)
if err != nil {
return nil, err
}
discDht, err := algoDht.MakeDHT(ctx, h, network, cfg, bootstrapPeers)
func MakeCapabilitiesDiscovery(ctx context.Context, cfg config.Local, h host.Host, networkID protocol.NetworkID, log logging.Logger, bootstrapFunc func() []peer.AddrInfo) (*CapabilitiesDiscovery, error) {
discDht, err := algoDht.MakeDHT(ctx, h, networkID, cfg, bootstrapFunc)
if err != nil {
return nil, err
}
Expand Down
22 changes: 14 additions & 8 deletions network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ func TestCapabilities_Discovery(t *testing.T) {
testSize := 3
for i := 0; i < testSize; i++ {
tempdir := t.TempDir()
capD, err := MakeCapabilitiesDiscovery(context.Background(), config.GetDefaultLocal(), tempdir, "devtestnet", logging.Base(), []*peer.AddrInfo{})
ps, err := peerstore.NewPeerStore(nil)
require.NoError(t, err)
h, _, err := MakeHost(config.GetDefaultLocal(), tempdir, ps)
require.NoError(t, err)
capD, err := MakeCapabilitiesDiscovery(context.Background(), config.GetDefaultLocal(), h, "devtestnet", logging.Base(), func() []peer.AddrInfo { return nil })
require.NoError(t, err)
caps = append(caps, capD)
addrs = append(addrs, peer.AddrInfo{
Expand All @@ -72,7 +76,7 @@ func TestCapabilities_Discovery(t *testing.T) {

func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT {
var hosts []host.Host
var bootstrapPeers []*peer.AddrInfo
var bootstrapPeers []peer.AddrInfo
var dhts []*dht.IpfsDHT
cfg := config.GetDefaultLocal()
for i := 0; i < numHosts; i++ {
Expand All @@ -87,10 +91,10 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT {
libp2p.Peerstore(ps))
require.NoError(t, err)
hosts = append(hosts, h)
bootstrapPeers = append(bootstrapPeers, &peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
bootstrapPeers = append(bootstrapPeers, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
}
for _, h := range hosts {
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, bootstrapPeers)
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, func() []peer.AddrInfo { return bootstrapPeers })
require.NoError(t, err)
err = ht.Bootstrap(context.Background())
require.NoError(t, err)
Expand All @@ -117,7 +121,7 @@ func waitForRouting(t *testing.T, disc *CapabilitiesDiscovery) {

func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*CapabilitiesDiscovery {
var hosts []host.Host
var bootstrapPeers []*peer.AddrInfo
var bootstrapPeers []peer.AddrInfo
var capsDisc []*CapabilitiesDiscovery
cfg := config.GetDefaultLocal()
for i := 0; i < numHosts; i++ {
Expand All @@ -132,17 +136,19 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap
libp2p.Peerstore(ps))
require.NoError(t, err)
hosts = append(hosts, h)
bootstrapPeers = append(bootstrapPeers, &peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
bootstrapPeers = append(bootstrapPeers, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
}
for _, h := range hosts {
bp := bootstrapPeers
if numBootstrapPeers != 0 && numBootstrapPeers != numHosts {
bp = make([]peer.AddrInfo, len(bootstrapPeers))
copy(bp, bootstrapPeers)
rand.Shuffle(len(bootstrapPeers), func(i, j int) {
bp[i], bp[j] = bp[j], bp[i]
})
bp = bp[:numBootstrapPeers]
}
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, bp)
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, func() []peer.AddrInfo { return bp })
require.NoError(t, err)
disc, err := algodht.MakeDiscovery(ht)
require.NoError(t, err)
Expand Down Expand Up @@ -304,7 +310,7 @@ func TestCapabilities_ExcludesSelf(t *testing.T) {
disc := setupCapDiscovery(t, 2, 2)

testPeersFound := func(disc *CapabilitiesDiscovery, n int, cap Capability) bool {
peers, err := disc.PeersForCapability(cap, n+1)
peers, err := disc.PeersForCapability(cap, n)
if err == nil && len(peers) == n {
return true
}
Expand Down
46 changes: 5 additions & 41 deletions network/p2p/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,62 +32,26 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/routing"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/network/p2p/dnsaddr"
algoproto "github.com/algorand/go-algorand/protocol"
)

const minBackoff = time.Second * 5
const maxBackoff = time.Second * 20
const baseBackoff = float64(1.1)

// getBootstrapPeersFunc looks up a list of Multiaddrs strings from the dnsaddr records at the primary
// SRV record domain.
func getBootstrapPeersFunc(cfg config.Local, network string) func() []peer.AddrInfo {
return func() []peer.AddrInfo {
var addrs []peer.AddrInfo
bootstraps := cfg.DNSBootstrapArray(algoproto.NetworkID(network))
for _, dnsBootstrap := range bootstraps {
controller := dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecuritySRVEnforced(), "")
resolvedAddrs, err := dnsaddr.MultiaddrsFromResolver(dnsBootstrap.PrimarySRVBootstrap, controller)
if err != nil {
continue
}
for _, resolvedAddr := range resolvedAddrs {
info, err0 := peer.AddrInfoFromP2pAddr(resolvedAddr)
if err0 != nil {
continue
}
addrs = append(addrs, *info)
}
}
return addrs
}
}

func dhtProtocolPrefix(network string) protocol.ID {
return protocol.ID(fmt.Sprintf("/algorand/kad/%s", network))
func dhtProtocolPrefix(networkID algoproto.NetworkID) protocol.ID {
return protocol.ID(fmt.Sprintf("/algorand/kad/%s", networkID))
}

// MakeDHT creates the dht.IpfsDHT object
func MakeDHT(ctx context.Context, h host.Host, network string, cfg config.Local, bootstrapPeers []*peer.AddrInfo) (*dht.IpfsDHT, error) {
func MakeDHT(ctx context.Context, h host.Host, networkID algoproto.NetworkID, cfg config.Local, bootstrapFunc func() []peer.AddrInfo) (*dht.IpfsDHT, error) {
dhtCfg := []dht.Option{
// Automatically determine server or client mode
dht.Mode(dht.ModeAutoServer),
// We don't need the value store right now
dht.DisableValues(),
dht.ProtocolPrefix(dhtProtocolPrefix(network)),
}
if len(bootstrapPeers) > 0 {
var peers []peer.AddrInfo
for _, bPeer := range bootstrapPeers {
if bPeer != nil {
peers = append(peers, *bPeer)
}
}
dhtCfg = append(dhtCfg, dht.BootstrapPeers(peers...))

} else {
dhtCfg = append(dhtCfg, dht.BootstrapPeersFunc(getBootstrapPeersFunc(cfg, network)))
dht.ProtocolPrefix(dhtProtocolPrefix(networkID)),
dht.BootstrapPeersFunc(bootstrapFunc),
}
return dht.New(ctx, h, dhtCfg...)
}
Expand Down
46 changes: 2 additions & 44 deletions network/p2p/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDHTBasic(t *testing.T) {
h,
"devtestnet",
config.GetDefaultLocal(),
[]*peer.AddrInfo{{}})
func() []peer.AddrInfo { return nil })
require.NoError(t, err)
_, err = MakeDiscovery(dht)
require.NoError(t, err)
Expand All @@ -55,52 +55,10 @@ func TestDHTBasicAlgodev(t *testing.T) {
require.NoError(t, err)
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "<network>.algodev.network"
dht, err := MakeDHT(context.Background(), h, "betanet", cfg, []*peer.AddrInfo{})
dht, err := MakeDHT(context.Background(), h, "betanet", cfg, func() []peer.AddrInfo { return nil })
require.NoError(t, err)
_, err = MakeDiscovery(dht)
require.NoError(t, err)
err = dht.Bootstrap(context.Background())
require.NoError(t, err)
}

func TestGetBootstrapPeers(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "<network>.algodev.network"
cfg.DNSSecurityFlags = 0

addrs := getBootstrapPeersFunc(cfg, "test")()

require.GreaterOrEqual(t, len(addrs), 1)
addr := addrs[0]
require.Equal(t, len(addr.Addrs), 1)
require.GreaterOrEqual(t, len(addr.Addrs), 1)
}

func TestGetBootstrapPeersFailure(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.DNSSecurityFlags = 0
cfg.DNSBootstrapID = "non-existent.algodev.network"

addrs := getBootstrapPeersFunc(cfg, "test")()

require.Equal(t, 0, len(addrs))
}

func TestGetBootstrapPeersInvalidAddr(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.DNSSecurityFlags = 0
cfg.DNSBootstrapID = "<network>.algodev.network"

addrs := getBootstrapPeersFunc(cfg, "testInvalidAddr")()

require.Equal(t, 0, len(addrs))
}
Loading