diff --git a/agreement/fuzzer/networkFacade_test.go b/agreement/fuzzer/networkFacade_test.go index 35eba4a273..77f58de3bf 100644 --- a/agreement/fuzzer/networkFacade_test.go +++ b/agreement/fuzzer/networkFacade_test.go @@ -247,7 +247,7 @@ func (n *NetworkFacade) PushDownstreamMessage(newMsg context.CancelFunc) bool { func (n *NetworkFacade) Address() (string, bool) { return "mock network", true } // Start - unused function -func (n *NetworkFacade) Start() {} +func (n *NetworkFacade) Start() error { return nil } // Stop - unused function func (n *NetworkFacade) Stop() {} diff --git a/agreement/gossip/network_test.go b/agreement/gossip/network_test.go index 3607afb168..fc6d4b032b 100644 --- a/agreement/gossip/network_test.go +++ b/agreement/gossip/network_test.go @@ -160,7 +160,7 @@ func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn return nil } -func (w *whiteholeNetwork) Start() { +func (w *whiteholeNetwork) Start() error { w.quit = make(chan struct{}) go func(w *whiteholeNetwork) { w.domain.messagesMu.Lock() @@ -216,7 +216,7 @@ func (w *whiteholeNetwork) Start() { atomic.AddUint32(&w.lastMsgRead, 1) } }(w) - return + return nil } func (w *whiteholeNetwork) getMux() *network.Multiplexer { return w.mux diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index e1ceb63142..39b39054fe 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -47,7 +47,8 @@ func (network *MockNetwork) Address() (string, bool) { } // Start - unused function -func (network *MockNetwork) Start() { +func (network *MockNetwork) Start() error { + return nil } // Stop - unused function diff --git a/daemon/algod/server.go b/daemon/algod/server.go index 1b40e98bfb..0dc5fcafab 100644 --- a/daemon/algod/server.go +++ b/daemon/algod/server.go @@ -56,7 +56,7 @@ const maxHeaderBytes = 4096 type ServerNode interface { apiServer.APINodeInterface ListeningAddress() (string, bool) - Start() + Start() error Stop() } @@ -272,7 +272,13 @@ func makeListener(addr string) (net.Listener, error) { func (s *Server) Start() { s.log.Info("Trying to start an Algorand node") fmt.Print("Initializing the Algorand node... ") - s.node.Start() + err := s.node.Start() + if err != nil { + msg := fmt.Sprintf("Failed to start alg Algorand node: %v", err) + s.log.Error(msg) + fmt.Println(msg) + os.Exit(1) + } s.log.Info("Successfully started an Algorand node.") fmt.Println("Success!") @@ -291,7 +297,6 @@ func (s *Server) Start() { } var apiToken string - var err error fmt.Printf("API authentication disabled: %v\n", cfg.DisableAPIAuth) if !cfg.DisableAPIAuth { apiToken, err = tokens.GetAndValidateAPIToken(s.RootPath, tokens.AlgodTokenFilename) diff --git a/network/gossipNode.go b/network/gossipNode.go index 8d1b877449..56f285cb06 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -72,7 +72,7 @@ type GossipNode interface { GetPeers(options ...PeerOption) []Peer // Start threads, listen on sockets. - Start() + Start() error // Close sockets. Stop threads. Stop() diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index c09c5d4a1a..b43082eabb 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -42,7 +42,7 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p // supply alternate NetAddress for P2P network p2pcfg := cfg p2pcfg.NetAddress = cfg.P2PListenAddress - p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID) + p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo) if err != nil { return nil, err } @@ -153,17 +153,17 @@ func (n *HybridP2PNetwork) GetPeers(options ...PeerOption) []Peer { } // Start implements GossipNode -func (n *HybridP2PNetwork) Start() { - _ = n.runParallel(func(net GossipNode) error { - net.Start() - return nil +func (n *HybridP2PNetwork) Start() error { + err := n.runParallel(func(net GossipNode) error { + return net.Start() }) + return err } // Stop implements GossipNode func (n *HybridP2PNetwork) Stop() { _ = n.runParallel(func(net GossipNode) error { - net.Start() + net.Stop() return nil }) } diff --git a/network/p2p/capabilities.go b/network/p2p/capabilities.go index 149d643da1..502383b4ee 100644 --- a/network/p2p/capabilities.go +++ b/network/p2p/capabilities.go @@ -30,7 +30,7 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/logging" algoDht "github.com/algorand/go-algorand/network/p2p/dht" - "github.com/algorand/go-algorand/network/p2p/peerstore" + "github.com/algorand/go-algorand/protocol" ) // Capability represents functions that some nodes may provide and other nodes would want to know about @@ -88,7 +88,9 @@ func (c *CapabilitiesDiscovery) PeersForCapability(capability Capability, n int) ctx, cancel := context.WithTimeout(context.Background(), operationTimeout) defer cancel() var peers []peer.AddrInfo - peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n)) + // +1 because it can include self but we exclude self from the returned list + // that might confuse the caller (and tests assertions) + peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n+1)) if err != nil { return nil, err } @@ -146,17 +148,19 @@ func (c *CapabilitiesDiscovery) AdvertiseCapabilities(capabilities ...Capability }() } +// Sizer exposes the Size method +type Sizer interface { + Size() int +} + +// RoutingTable exposes some knowledge about the DHT routing table +func (c *CapabilitiesDiscovery) RoutingTable() Sizer { + return c.dht.RoutingTable() +} + // MakeCapabilitiesDiscovery creates a new CapabilitiesDiscovery object which exposes peer discovery and capabilities advertisement -func MakeCapabilitiesDiscovery(ctx context.Context, cfg config.Local, datadir string, network string, log logging.Logger, bootstrapPeers []*peer.AddrInfo) (*CapabilitiesDiscovery, error) { - pstore, err := peerstore.NewPeerStore(bootstrapPeers) - if err != nil { - return nil, err - } - h, err := makeHost(cfg, datadir, pstore) - if err != nil { - return nil, err - } - discDht, err := algoDht.MakeDHT(ctx, h, network, cfg, bootstrapPeers) +func MakeCapabilitiesDiscovery(ctx context.Context, cfg config.Local, h host.Host, networkID protocol.NetworkID, log logging.Logger, bootstrapFunc func() []peer.AddrInfo) (*CapabilitiesDiscovery, error) { + discDht, err := algoDht.MakeDHT(ctx, h, networkID, cfg, bootstrapFunc) if err != nil { return nil, err } diff --git a/network/p2p/capabilities_test.go b/network/p2p/capabilities_test.go index 82fee84d03..ca3de978b7 100644 --- a/network/p2p/capabilities_test.go +++ b/network/p2p/capabilities_test.go @@ -47,7 +47,11 @@ func TestCapabilities_Discovery(t *testing.T) { testSize := 3 for i := 0; i < testSize; i++ { tempdir := t.TempDir() - capD, err := MakeCapabilitiesDiscovery(context.Background(), config.GetDefaultLocal(), tempdir, "devtestnet", logging.Base(), []*peer.AddrInfo{}) + ps, err := peerstore.NewPeerStore(nil) + require.NoError(t, err) + h, _, err := MakeHost(config.GetDefaultLocal(), tempdir, ps) + require.NoError(t, err) + capD, err := MakeCapabilitiesDiscovery(context.Background(), config.GetDefaultLocal(), h, "devtestnet", logging.Base(), func() []peer.AddrInfo { return nil }) require.NoError(t, err) caps = append(caps, capD) addrs = append(addrs, peer.AddrInfo{ @@ -72,7 +76,7 @@ func TestCapabilities_Discovery(t *testing.T) { func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT { var hosts []host.Host - var bootstrapPeers []*peer.AddrInfo + var bootstrapPeers []peer.AddrInfo var dhts []*dht.IpfsDHT cfg := config.GetDefaultLocal() for i := 0; i < numHosts; i++ { @@ -87,10 +91,10 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT { libp2p.Peerstore(ps)) require.NoError(t, err) hosts = append(hosts, h) - bootstrapPeers = append(bootstrapPeers, &peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + bootstrapPeers = append(bootstrapPeers, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) } for _, h := range hosts { - ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, bootstrapPeers) + ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, func() []peer.AddrInfo { return bootstrapPeers }) require.NoError(t, err) err = ht.Bootstrap(context.Background()) require.NoError(t, err) @@ -117,7 +121,7 @@ func waitForRouting(t *testing.T, disc *CapabilitiesDiscovery) { func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*CapabilitiesDiscovery { var hosts []host.Host - var bootstrapPeers []*peer.AddrInfo + var bootstrapPeers []peer.AddrInfo var capsDisc []*CapabilitiesDiscovery cfg := config.GetDefaultLocal() for i := 0; i < numHosts; i++ { @@ -132,17 +136,19 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap libp2p.Peerstore(ps)) require.NoError(t, err) hosts = append(hosts, h) - bootstrapPeers = append(bootstrapPeers, &peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + bootstrapPeers = append(bootstrapPeers, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) } for _, h := range hosts { bp := bootstrapPeers if numBootstrapPeers != 0 && numBootstrapPeers != numHosts { + bp = make([]peer.AddrInfo, len(bootstrapPeers)) + copy(bp, bootstrapPeers) rand.Shuffle(len(bootstrapPeers), func(i, j int) { bp[i], bp[j] = bp[j], bp[i] }) bp = bp[:numBootstrapPeers] } - ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, bp) + ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, func() []peer.AddrInfo { return bp }) require.NoError(t, err) disc, err := algodht.MakeDiscovery(ht) require.NoError(t, err) @@ -304,7 +310,7 @@ func TestCapabilities_ExcludesSelf(t *testing.T) { disc := setupCapDiscovery(t, 2, 2) testPeersFound := func(disc *CapabilitiesDiscovery, n int, cap Capability) bool { - peers, err := disc.PeersForCapability(cap, n+1) + peers, err := disc.PeersForCapability(cap, n) if err == nil && len(peers) == n { return true } diff --git a/network/p2p/dht/dht.go b/network/p2p/dht/dht.go index 9266605af6..4b55e7e4f7 100644 --- a/network/p2p/dht/dht.go +++ b/network/p2p/dht/dht.go @@ -32,7 +32,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/algorand/go-algorand/config" - "github.com/algorand/go-algorand/network/p2p/dnsaddr" algoproto "github.com/algorand/go-algorand/protocol" ) @@ -40,54 +39,19 @@ const minBackoff = time.Second * 5 const maxBackoff = time.Second * 20 const baseBackoff = float64(1.1) -// getBootstrapPeersFunc looks up a list of Multiaddrs strings from the dnsaddr records at the primary -// SRV record domain. -func getBootstrapPeersFunc(cfg config.Local, network string) func() []peer.AddrInfo { - return func() []peer.AddrInfo { - var addrs []peer.AddrInfo - bootstraps := cfg.DNSBootstrapArray(algoproto.NetworkID(network)) - for _, dnsBootstrap := range bootstraps { - controller := dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecuritySRVEnforced(), "") - resolvedAddrs, err := dnsaddr.MultiaddrsFromResolver(dnsBootstrap.PrimarySRVBootstrap, controller) - if err != nil { - continue - } - for _, resolvedAddr := range resolvedAddrs { - info, err0 := peer.AddrInfoFromP2pAddr(resolvedAddr) - if err0 != nil { - continue - } - addrs = append(addrs, *info) - } - } - return addrs - } -} - -func dhtProtocolPrefix(network string) protocol.ID { - return protocol.ID(fmt.Sprintf("/algorand/kad/%s", network)) +func dhtProtocolPrefix(networkID algoproto.NetworkID) protocol.ID { + return protocol.ID(fmt.Sprintf("/algorand/kad/%s", networkID)) } // MakeDHT creates the dht.IpfsDHT object -func MakeDHT(ctx context.Context, h host.Host, network string, cfg config.Local, bootstrapPeers []*peer.AddrInfo) (*dht.IpfsDHT, error) { +func MakeDHT(ctx context.Context, h host.Host, networkID algoproto.NetworkID, cfg config.Local, bootstrapFunc func() []peer.AddrInfo) (*dht.IpfsDHT, error) { dhtCfg := []dht.Option{ // Automatically determine server or client mode dht.Mode(dht.ModeAutoServer), // We don't need the value store right now dht.DisableValues(), - dht.ProtocolPrefix(dhtProtocolPrefix(network)), - } - if len(bootstrapPeers) > 0 { - var peers []peer.AddrInfo - for _, bPeer := range bootstrapPeers { - if bPeer != nil { - peers = append(peers, *bPeer) - } - } - dhtCfg = append(dhtCfg, dht.BootstrapPeers(peers...)) - - } else { - dhtCfg = append(dhtCfg, dht.BootstrapPeersFunc(getBootstrapPeersFunc(cfg, network))) + dht.ProtocolPrefix(dhtProtocolPrefix(networkID)), + dht.BootstrapPeersFunc(bootstrapFunc), } return dht.New(ctx, h, dhtCfg...) } diff --git a/network/p2p/dht/dht_test.go b/network/p2p/dht/dht_test.go index 3d4eabb3bc..93b8b379e5 100644 --- a/network/p2p/dht/dht_test.go +++ b/network/p2p/dht/dht_test.go @@ -39,7 +39,7 @@ func TestDHTBasic(t *testing.T) { h, "devtestnet", config.GetDefaultLocal(), - []*peer.AddrInfo{{}}) + func() []peer.AddrInfo { return nil }) require.NoError(t, err) _, err = MakeDiscovery(dht) require.NoError(t, err) @@ -55,52 +55,10 @@ func TestDHTBasicAlgodev(t *testing.T) { require.NoError(t, err) cfg := config.GetDefaultLocal() cfg.DNSBootstrapID = ".algodev.network" - dht, err := MakeDHT(context.Background(), h, "betanet", cfg, []*peer.AddrInfo{}) + dht, err := MakeDHT(context.Background(), h, "betanet", cfg, func() []peer.AddrInfo { return nil }) require.NoError(t, err) _, err = MakeDiscovery(dht) require.NoError(t, err) err = dht.Bootstrap(context.Background()) require.NoError(t, err) } - -func TestGetBootstrapPeers(t *testing.T) { - t.Parallel() - partitiontest.PartitionTest(t) - - cfg := config.GetDefaultLocal() - cfg.DNSBootstrapID = ".algodev.network" - cfg.DNSSecurityFlags = 0 - - addrs := getBootstrapPeersFunc(cfg, "test")() - - require.GreaterOrEqual(t, len(addrs), 1) - addr := addrs[0] - require.Equal(t, len(addr.Addrs), 1) - require.GreaterOrEqual(t, len(addr.Addrs), 1) -} - -func TestGetBootstrapPeersFailure(t *testing.T) { - t.Parallel() - partitiontest.PartitionTest(t) - - cfg := config.GetDefaultLocal() - cfg.DNSSecurityFlags = 0 - cfg.DNSBootstrapID = "non-existent.algodev.network" - - addrs := getBootstrapPeersFunc(cfg, "test")() - - require.Equal(t, 0, len(addrs)) -} - -func TestGetBootstrapPeersInvalidAddr(t *testing.T) { - t.Parallel() - partitiontest.PartitionTest(t) - - cfg := config.GetDefaultLocal() - cfg.DNSSecurityFlags = 0 - cfg.DNSBootstrapID = ".algodev.network" - - addrs := getBootstrapPeersFunc(cfg, "testInvalidAddr")() - - require.Equal(t, 0, len(addrs)) -} diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 52f23c0802..801a007e51 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -26,6 +26,7 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-deadlock" + "github.com/multiformats/go-multiaddr" "github.com/libp2p/go-libp2p" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -41,6 +42,7 @@ import ( // Service defines the interface used by the network integrating with underlying p2p implementation type Service interface { + Start() error Close() error ID() peer.ID // return peer.ID for self IDSigner() *PeerIDChallengeSigner @@ -58,12 +60,13 @@ type Service interface { // serviceImpl manages integration with libp2p and implements the Service interface type serviceImpl struct { - log logging.Logger - host host.Host - streams *streamManager - pubsub *pubsub.PubSub - pubsubCtx context.Context - privKey crypto.PrivKey + log logging.Logger + listenAddr string + host host.Host + streams *streamManager + pubsub *pubsub.PubSub + pubsubCtx context.Context + privKey crypto.PrivKey topics map[string]*pubsub.Topic topicsMu deadlock.RWMutex @@ -74,11 +77,13 @@ const AlgorandWsProtocol = "/algorand-ws/1.0.0" const dialTimeout = 30 * time.Second -func makeHost(cfg config.Local, datadir string, pstore peerstore.Peerstore) (host.Host, error) { +// MakeHost creates a libp2p host but does not start listening. +// Use host.Network().Listen() on the returned address to start listening. +func MakeHost(cfg config.Local, datadir string, pstore peerstore.Peerstore) (host.Host, string, error) { // load stored peer ID, or make ephemeral peer ID privKey, err := GetPrivKey(cfg, datadir) if err != nil { - return nil, err + return nil, "", err } // muxer supports tweaking fields from yamux.Config @@ -96,24 +101,26 @@ func makeHost(cfg config.Local, datadir string, pstore peerstore.Peerstore) (hos listenAddr = "/ip4/0.0.0.0/tcp/0" } - return libp2p.New( + // the libp2p.NoListenAddrs builtin disables relays but this one does not + var noListenAddrs = func(cfg *libp2p.Config) error { + cfg.ListenAddrs = []multiaddr.Multiaddr{} + return nil + } + + host, err := libp2p.New( libp2p.Identity(privKey), libp2p.UserAgent(ua), libp2p.Transport(tcp.NewTCPTransport), libp2p.Muxer("/yamux/1.0.0", &ymx), libp2p.Peerstore(pstore), - libp2p.ListenAddrStrings(listenAddr), + noListenAddrs, libp2p.Security(noise.ID, noise.New), ) + return host, listenAddr, err } // MakeService creates a P2P service instance -func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, datadir string, pstore peerstore.Peerstore, wsStreamHandler StreamHandler) (*serviceImpl, error) { - h, err := makeHost(cfg, datadir, pstore) - if err != nil { - return nil, err - } - log.Infof("P2P service started: peer ID %s addrs %s", h.ID(), h.Addrs()) +func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) { sm := makeStreamManager(ctx, log, h, wsStreamHandler) h.Network().Notify(sm) @@ -125,16 +132,28 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, data } return &serviceImpl{ - log: log, - host: h, - streams: sm, - pubsub: ps, - pubsubCtx: ctx, - privKey: pstore.PrivKey(h.ID()), - topics: make(map[string]*pubsub.Topic), + log: log, + listenAddr: listenAddr, + host: h, + streams: sm, + pubsub: ps, + pubsubCtx: ctx, + privKey: h.Peerstore().PrivKey(h.ID()), + topics: make(map[string]*pubsub.Topic), }, nil } +// Close shuts down the P2P service +func (s *serviceImpl) Start() error { + listenAddr, err := multiaddr.NewMultiaddr(s.listenAddr) + if err != nil { + s.log.Errorf("failed to create multiaddress: %s", err) + return err + } + + return s.host.Network().Listen(listenAddr) +} + // Close shuts down the P2P service func (s *serviceImpl) Close() error { return s.host.Close() diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index b5122516ff..4cd293c3f2 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -28,6 +28,7 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/network/p2p" + "github.com/algorand/go-algorand/network/p2p/dnsaddr" "github.com/algorand/go-algorand/network/p2p/peerstore" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-deadlock" @@ -63,6 +64,68 @@ type P2PNetwork struct { wsPeersLock deadlock.RWMutex wsPeersChangeCounter atomic.Int32 wsPeersConnectivityCheckTicker *time.Ticker + + capabilitiesDiscovery *p2p.CapabilitiesDiscovery + + bootstrapper bootstrapper + nodeInfo NodeInfo +} + +type bootstrapper struct { + cfg config.Local + networkID protocol.NetworkID + phonebookPeers []*peer.AddrInfo + started bool +} + +func (b *bootstrapper) start() { + b.started = true +} + +func (b *bootstrapper) stop() { + b.started = false +} + +func (b *bootstrapper) BootstrapFunc() []peer.AddrInfo { + // not started yet, do not give it any peers + if !b.started { + return nil + } + + // have a list of peers, use them + if len(b.phonebookPeers) > 0 { + var addrs []peer.AddrInfo + for _, bPeer := range b.phonebookPeers { + if bPeer != nil { + addrs = append(addrs, *bPeer) + } + } + return addrs + } + + return getBootstrapPeers(b.cfg, b.networkID) +} + +// getBootstrapPeers looks up a list of Multiaddrs strings from the dnsaddr records at the primary +// SRV record domain. +func getBootstrapPeers(cfg config.Local, network protocol.NetworkID) []peer.AddrInfo { + var addrs []peer.AddrInfo + bootstraps := cfg.DNSBootstrapArray(network) + for _, dnsBootstrap := range bootstraps { + controller := dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecuritySRVEnforced(), "") + resolvedAddrs, err := dnsaddr.MultiaddrsFromResolver(dnsBootstrap.PrimarySRVBootstrap, controller) + if err != nil { + continue + } + for _, resolvedAddr := range resolvedAddrs { + info, err0 := peer.AddrInfoFromP2pAddr(resolvedAddr) + if err0 != nil { + continue + } + addrs = append(addrs, *info) + } + } + return addrs } type p2pPeerStats struct { @@ -77,7 +140,7 @@ type gossipSubPeer struct { func (p gossipSubPeer) GetNetwork() GossipNode { return p.net } // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service -func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID) (*P2PNetwork, error) { +func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo) (*P2PNetwork, error) { const readBufferLen = 2048 // create Peerstore and add phonebook addresses @@ -99,6 +162,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo wsPeers: make(map[peer.ID]*wsPeer), wsPeersToIDs: make(map[*wsPeer]peer.ID), peerStats: make(map[peer.ID]*p2pPeerStats), + nodeInfo: node, } net.ctx, net.ctxCancel = context.WithCancel(context.Background()) net.handler = msgHandler{ @@ -115,11 +179,32 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo broadcastQueueBulk: make(chan broadcastRequest, 100), } - net.service, err = p2p.MakeService(net.ctx, log, cfg, datadir, pstore, net.wsStreamHandler) + h, la, err := p2p.MakeHost(cfg, datadir, pstore) + if err != nil { + return nil, err + } + log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs()) + + net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler, addrInfo) if err != nil { return nil, err } + bootstrapper := &bootstrapper{ + cfg: cfg, + networkID: networkID, + phonebookPeers: addrInfo, + } + + if cfg.EnableDHTProviders { + disc, err0 := p2p.MakeCapabilitiesDiscovery(net.ctx, cfg, h, networkID, net.log, bootstrapper.BootstrapFunc) + if err0 != nil { + log.Errorf("Failed to create dht node capabilities discovery: %v", err) + return nil, err + } + net.capabilitiesDiscovery = disc + } + err = net.setup() if err != nil { return nil, err @@ -146,8 +231,13 @@ func (n *P2PNetwork) PeerIDSigner() identityChallengeSigner { } // Start threads, listen on sockets. -func (n *P2PNetwork) Start() { +func (n *P2PNetwork) Start() error { n.wg.Add(1) + n.bootstrapper.start() + err := n.service.Start() + if err != nil { + return err + } go n.txTopicHandleLoop() if n.wsPeersConnectivityCheckTicker != nil { @@ -166,10 +256,20 @@ func (n *P2PNetwork) Start() { n.wg.Add(1) go n.meshThread() + + if n.capabilitiesDiscovery != nil { + n.capabilitiesDiscovery.AdvertiseCapabilities(n.nodeInfo.Capabilities()...) + } + + return nil } // Stop closes sockets and stop threads. func (n *P2PNetwork) Stop() { + if n.capabilitiesDiscovery != nil { + n.capabilitiesDiscovery.Close() + } + n.handler.ClearHandlers([]Tag{}) if n.wsPeersConnectivityCheckTicker != nil { n.wsPeersConnectivityCheckTicker.Stop() @@ -178,6 +278,7 @@ func (n *P2PNetwork) Stop() { n.innerStop() n.ctxCancel() n.service.Close() + n.bootstrapper.stop() n.wg.Wait() } diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 75fc66b0ad..5ddc9af20e 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -19,6 +19,7 @@ package network import ( "context" "fmt" + "sync" "sync/atomic" "testing" "time" @@ -42,24 +43,24 @@ func TestP2PSubmitTX(t *testing.T) { cfg := config.GetDefaultLocal() log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) - peerInfoA := netA.service.AddrInfo() + netA.Start() + defer netA.Stop() + peerInfoA := netA.service.AddrInfo() addrsA, err := peerstore.AddrInfoToP2pAddrs(&peerInfoA) require.NoError(t, err) require.NotZero(t, addrsA[0]) - netA.Start() - defer netA.Stop() multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) netB.Start() defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet) + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) netC.Start() @@ -116,27 +117,30 @@ func TestP2PSubmitWS(t *testing.T) { cfg := config.GetDefaultLocal() log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + + err = netA.Start() require.NoError(t, err) + defer netA.Stop() peerInfoA := netA.service.AddrInfo() addrsA, err := peerstore.AddrInfoToP2pAddrs(&peerInfoA) require.NoError(t, err) require.NotZero(t, addrsA[0]) - netA.Start() - defer netA.Stop() multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet) + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + err = netB.Start() require.NoError(t, err) - netB.Start() defer netB.Stop() - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet) - + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + require.NoError(t, err) + err = netC.Start() require.NoError(t, err) - netC.Start() defer netC.Stop() require.Eventually( @@ -189,6 +193,10 @@ type mockService struct { peers map[peer.ID]peer.AddrInfo } +func (s *mockService) Start() error { + return nil +} + func (s *mockService) Close() error { return nil } @@ -254,7 +262,7 @@ func TestP2PNetworkAddress(t *testing.T) { cfg := config.GetDefaultLocal() log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet) + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) defer netA.Stop() require.NoError(t, err) addrInfo := netA.service.AddrInfo() @@ -308,3 +316,191 @@ func TestP2PNetworkAddress(t *testing.T) { require.False(t, ok) require.Empty(t, retAddr) } + +func TestBootstrapFunc(t *testing.T) { + t.Parallel() + partitiontest.PartitionTest(t) + + b := bootstrapper{} + require.Nil(t, b.BootstrapFunc()) + + b.started = true + p := peer.AddrInfo{ID: "test"} + b.phonebookPeers = []*peer.AddrInfo{&p} + require.Equal(t, []peer.AddrInfo{p}, b.BootstrapFunc()) + + b.phonebookPeers = nil + + b.cfg = config.GetDefaultLocal() + b.cfg.DNSBootstrapID = ".algodev.network" + b.cfg.DNSSecurityFlags = 0 + b.networkID = "test" + + addrs := b.BootstrapFunc() + + require.GreaterOrEqual(t, len(addrs), 1) + addr := addrs[0] + require.Equal(t, len(addr.Addrs), 1) + require.GreaterOrEqual(t, len(addr.Addrs), 1) +} + +func TestGetBootstrapPeersFailure(t *testing.T) { + t.Parallel() + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + cfg.DNSSecurityFlags = 0 + cfg.DNSBootstrapID = "non-existent.algodev.network" + + addrs := getBootstrapPeers(cfg, "test") + + require.Equal(t, 0, len(addrs)) +} + +func TestGetBootstrapPeersInvalidAddr(t *testing.T) { + t.Parallel() + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + cfg.DNSSecurityFlags = 0 + cfg.DNSBootstrapID = ".algodev.network" + + addrs := getBootstrapPeers(cfg, "testInvalidAddr") + + require.Equal(t, 0, len(addrs)) +} + +type capNodeInfo struct { + nopeNodeInfo + cap p2p.Capability +} + +func (ni *capNodeInfo) Capabilities() []p2p.Capability { + return []p2p.Capability{ni.cap} +} + +func waitForRouting(t *testing.T, disc *p2p.CapabilitiesDiscovery) { + refreshCtx, refCancel := context.WithTimeout(context.Background(), time.Second*5) + for { + select { + case <-refreshCtx.Done(): + refCancel() + require.Fail(t, "failed to populate routing table before timeout") + default: + if disc.RoutingTable().Size() > 0 { + refCancel() + return + } + } + time.Sleep(50 * time.Millisecond) + } +} + +// TestP2PNetworkDHTCapabilities runs nodes with capabilites and ensures that connected nodes +// can discover themself. The other nodes receive the first node in bootstrap list before starting. +// There is two variations of the test: only netA advertises capabilities, and all nodes advertise. +func TestP2PNetworkDHTCapabilities(t *testing.T) { + partitiontest.PartitionTest(t) + + cfg := config.GetDefaultLocal() + cfg.EnableDHTProviders = true + log := logging.TestingLog(t) + + cap := p2p.Archival + tests := []struct { + name string + nis []NodeInfo + numCapPeers int + }{ + {"cap=all", []NodeInfo{&capNodeInfo{cap: cap}, &capNodeInfo{cap: cap}, &capNodeInfo{cap: cap}}, 2}, // each has 2 peers with capabilities + {"cap=netA", []NodeInfo{&capNodeInfo{cap: cap}, &nopeNodeInfo{}, &nopeNodeInfo{}}, 1}, // each has 1 peer with capabilities + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, test.nis[0]) + require.NoError(t, err) + + err = netA.Start() + require.NoError(t, err) + defer netA.Stop() + + peerInfoA := netA.service.AddrInfo() + addrsA, err := peerstore.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsA[0]) + + multiAddrStr := addrsA[0].String() + phoneBookAddresses := []string{multiAddrStr} + netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[1]) + require.NoError(t, err) + err = netB.Start() + require.NoError(t, err) + defer netB.Stop() + + netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, test.nis[2]) + require.NoError(t, err) + err = netC.Start() + require.NoError(t, err) + defer netC.Stop() + + require.Eventually( + t, + func() bool { + return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) > 0 && + len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 && + len(netC.service.ListPeersForTopic(p2p.TXTopicName)) > 0 + }, + 2*time.Second, + 50*time.Millisecond, + ) + t.Logf("peers connected") + + discs := []*p2p.CapabilitiesDiscovery{netA.capabilitiesDiscovery, netB.capabilitiesDiscovery, netC.capabilitiesDiscovery} + + var wg sync.WaitGroup + wg.Add(len(discs)) + for _, disc := range discs { + if disc == nil { + wg.Done() + continue + } + go func(disc *p2p.CapabilitiesDiscovery) { + defer wg.Done() + waitForRouting(t, disc) + }(disc) + } + wg.Wait() + + t.Logf("DHT is ready") + + // ensure all peers are connected + for _, disc := range discs { + require.Equal(t, 2, len(disc.Host().Network().Peers())) + } + + wg.Add(len(discs)) + for _, disc := range discs { + go func(disc *p2p.CapabilitiesDiscovery) { + defer wg.Done() + if disc == netA.capabilitiesDiscovery { + return + } + require.Eventuallyf(t, + func() bool { + peers, err := disc.PeersForCapability(cap, test.numCapPeers) + if err == nil && len(peers) == test.numCapPeers { + return true + } + return false + }, + time.Minute, + time.Second, + fmt.Sprintf("Not all expected %s cap peers were found", cap), + ) + }(disc) + } + wg.Wait() + }) + } +} diff --git a/network/wsNetwork.go b/network/wsNetwork.go index a7fab2d174..b5858cb22c 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -156,6 +156,8 @@ const GossipNetworkPath = "/v1/{genesisID}/gossip" type NodeInfo interface { // IsParticipating returns true if this node has stake and may vote on blocks or propose blocks. IsParticipating() bool + // Capabilities returns a list of capabilities this node has. + Capabilities() []p2p.Capability } type nopeNodeInfo struct { @@ -165,6 +167,10 @@ func (nnni *nopeNodeInfo) IsParticipating() bool { return false } +func (nnni *nopeNodeInfo) Capabilities() []p2p.Capability { + return nil +} + // WebsocketNetwork implements GossipNode type WebsocketNetwork struct { listener net.Listener @@ -680,7 +686,7 @@ func (wn *WebsocketNetwork) setup() { } // Start makes network connections and threads -func (wn *WebsocketNetwork) Start() { +func (wn *WebsocketNetwork) Start() error { wn.messagesOfInterestMu.Lock() defer wn.messagesOfInterestMu.Unlock() wn.messagesOfInterestEncoded = true @@ -692,7 +698,7 @@ func (wn *WebsocketNetwork) Start() { listener, err := net.Listen("tcp", wn.config.NetAddress) if err != nil { wn.log.Errorf("network could not listen %v: %s", wn.config.NetAddress, err) - return + return err } // wrap the original listener with a limited connection listener listener = limitlistener.RejectingLimitListener( @@ -768,6 +774,8 @@ func (wn *WebsocketNetwork) Start() { go wn.postMessagesOfInterestThread() wn.log.Infof("serving genesisID=%s on %#v with RandomID=%s", wn.GenesisID, wn.PublicAddress(), wn.RandomID) + + return nil } func (wn *WebsocketNetwork) httpdThread() { diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 5a07c38de7..368d098961 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -40,6 +40,7 @@ import ( "time" "github.com/algorand/go-algorand/internal/rapidgen" + "github.com/algorand/go-algorand/network/p2p" "pgregory.net/rapid" "github.com/stretchr/testify/assert" @@ -3289,6 +3290,9 @@ type participatingNodeInfo struct { func (nnni *participatingNodeInfo) IsParticipating() bool { return true } +func (nnni *participatingNodeInfo) Capabilities() []p2p.Capability { + return nil +} func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) { // Tests that A->B follows MOI diff --git a/node/follower_node.go b/node/follower_node.go index b60f185992..7c8ab482ea 100644 --- a/node/follower_node.go +++ b/node/follower_node.go @@ -162,7 +162,7 @@ func (node *AlgorandFollowerNode) Config() config.Local { } // Start the node: connect to peers while obtaining a lock. Doesn't wait for initial sync. -func (node *AlgorandFollowerNode) Start() { +func (node *AlgorandFollowerNode) Start() error { node.mu.Lock() defer node.mu.Unlock() @@ -172,22 +172,30 @@ func (node *AlgorandFollowerNode) Start() { // The start network is being called only after the various services start up. // We want to do so in order to let the services register their callbacks with the // network package before any connections are being made. - startNetwork := func() { + startNetwork := func() error { if !node.config.DisableNetworking { // start accepting connections - node.net.Start() + err := node.net.Start() + if err != nil { + return err + } node.config.NetAddress, _ = node.net.Address() } + return nil } + var err error if node.catchpointCatchupService != nil { - startNetwork() - _ = node.catchpointCatchupService.Start(node.ctx) + err = startNetwork() + if err == nil { + err = node.catchpointCatchupService.Start(node.ctx) + } } else { node.catchupService.Start() node.blockService.Start() - startNetwork() + err = startNetwork() } + return err } // ListeningAddress retrieves the node's current listening address, if any. diff --git a/node/node.go b/node/node.go index 165712148a..07e634c407 100644 --- a/node/node.go +++ b/node/node.go @@ -28,8 +28,6 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/algorand/go-deadlock" "github.com/algorand/go-algorand/agreement" @@ -156,8 +154,6 @@ type AlgorandFullNode struct { tracer messagetracer.MessageTracer stateProofWorker *stateproof.Worker - - capabilitiesDiscovery *p2p.CapabilitiesDiscovery } // TxnWithStatus represents information about a single transaction, @@ -200,15 +196,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd return nil, err } - if cfg.EnableDHTProviders { - caps, err0 := p2p.MakeCapabilitiesDiscovery(node.ctx, node.config, node.genesisDirs.RootGenesisDir, string(genesis.Network), node.log, []*peer.AddrInfo{}) - if err0 != nil { - log.Errorf("Failed to create dht node capabilities discovery: %v", err) - return nil, err - } - node.capabilitiesDiscovery = caps - } - // tie network, block fetcher, and agreement services together var p2pNode network.GossipNode if cfg.EnableP2PHybridMode { @@ -219,7 +206,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd } } else if cfg.EnableP2P { // TODO: pass more appropriate genesisDir (hot/cold). Presently this is just used to store a peerID key. - p2pNode, err = network.NewP2PNetwork(node.log, node.config, node.genesisDirs.RootGenesisDir, phonebookAddresses, genesis.ID(), genesis.Network) + p2pNode, err = network.NewP2PNetwork(node.log, node.config, node.genesisDirs.RootGenesisDir, phonebookAddresses, genesis.ID(), genesis.Network, node) if err != nil { log.Errorf("could not create p2p node: %v", err) return nil, err @@ -360,7 +347,7 @@ func (node *AlgorandFullNode) Config() config.Local { } // Start the node: connect to peers and run the agreement service while obtaining a lock. Doesn't wait for initial sync. -func (node *AlgorandFullNode) Start() { +func (node *AlgorandFullNode) Start() error { node.mu.Lock() defer node.mu.Unlock() @@ -370,12 +357,16 @@ func (node *AlgorandFullNode) Start() { // The start network is being called only after the various services start up. // We want to do so in order to let the services register their callbacks with the // network package before any connections are being made. - startNetwork := func() { + startNetwork := func() error { if !node.config.DisableNetworking { // start accepting connections - node.net.Start() + err := node.net.Start() + if err != nil { + return err + } node.config.NetAddress, _ = node.net.Address() } + return nil } if node.catchpointCatchupService != nil { @@ -389,17 +380,18 @@ func (node *AlgorandFullNode) Start() { node.ledgerService.Start() node.txHandler.Start() node.stateProofWorker.Start() - startNetwork() + err := startNetwork() + if err != nil { + return err + } node.startMonitoringRoutines() } - if node.capabilitiesDiscovery != nil { - node.capabilitiesDiscovery.AdvertiseCapabilities(node.capabilities()...) - } - + return nil } -func (node *AlgorandFullNode) capabilities() []p2p.Capability { +// Capabilities returns the node's capabilities for advertising to other nodes. +func (node *AlgorandFullNode) Capabilities() []p2p.Capability { var caps []p2p.Capability if node.IsArchival() { caps = append(caps, p2p.Archival) @@ -465,9 +457,6 @@ func (node *AlgorandFullNode) Stop() { node.lowPriorityCryptoVerificationPool.Shutdown() node.cryptoPool.Shutdown() node.cancelCtx() - if node.capabilitiesDiscovery != nil { - node.capabilitiesDiscovery.Close() - } } // note: unlike the other two functions, this accepts a whole filename diff --git a/tools/debug/algodump/main.go b/tools/debug/algodump/main.go index 429cec9f9d..4d8863b354 100644 --- a/tools/debug/algodump/main.go +++ b/tools/debug/algodump/main.go @@ -179,7 +179,11 @@ func main() { *genesisID, protocol.NetworkID(*networkID)) setDumpHandlers(n) - n.Start() + err := n.Start() + if err != nil { + log.Errorf("Failed to start network: %v", err) + return + } for { time.Sleep(time.Second) diff --git a/tools/debug/transplanter/main.go b/tools/debug/transplanter/main.go index 1ee2a9c84e..e892d1fcb7 100644 --- a/tools/debug/transplanter/main.go +++ b/tools/debug/transplanter/main.go @@ -393,7 +393,11 @@ func main() { os.Exit(1) } - followerNode.Start() + err = followerNode.Start() + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot start follower node: %v", err) + os.Exit(1) + } for followerNode.Ledger().Latest() < basics.Round(*roundStart) { fmt.Printf("At round %d, waiting for %d\n", followerNode.Ledger().Latest(), *roundStart)