From 78505320c89db5f93c109b69e3c0de171bfacad1 Mon Sep 17 00:00:00 2001 From: noot <36753753+noot@users.noreply.github.com> Date: Thu, 8 Apr 2021 18:48:11 -0400 Subject: [PATCH] feat(dot/network): implement persistent peers functionality (#1512) --- cmd/gossamer/config.go | 6 ++++ dot/config.go | 15 +++++---- dot/config/toml/config.go | 15 +++++---- dot/network/config.go | 3 ++ dot/network/connmgr.go | 65 ++++++++++++++++++++++--------------- dot/network/connmgr_test.go | 33 ++++++++++++++++++- dot/network/host.go | 51 +++++++++++++++++++---------- dot/services.go | 25 +++++++------- go.mod | 2 +- 9 files changed, 143 insertions(+), 72 deletions(-) diff --git a/cmd/gossamer/config.go b/cmd/gossamer/config.go index 9907abdaa8..0b47ff5721 100644 --- a/cmd/gossamer/config.go +++ b/cmd/gossamer/config.go @@ -562,6 +562,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot cfg.NoMDNS = tomlCfg.NoMDNS cfg.MinPeers = tomlCfg.MinPeers cfg.MaxPeers = tomlCfg.MaxPeers + cfg.PersistentPeers = tomlCfg.PersistentPeers // check --port flag and update node configuration if port := ctx.GlobalUint(PortFlag.Name); port != 0 { @@ -593,6 +594,10 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot cfg.NoMDNS = true } + if len(cfg.PersistentPeers) == 0 { + cfg.PersistentPeers = []string(nil) + } + logger.Debug( "network configuration", "port", cfg.Port, @@ -602,6 +607,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot "nomdns", cfg.NoMDNS, "minpeers", cfg.MinPeers, "maxpeers", cfg.MaxPeers, + "persistent-peers", cfg.PersistentPeers, ) } diff --git a/dot/config.go b/dot/config.go index 57d1e14634..61f05c4bdb 100644 --- a/dot/config.go +++ b/dot/config.go @@ -78,13 +78,14 @@ type AccountConfig struct { // NetworkConfig is to marshal/unmarshal toml network config vars type NetworkConfig struct { - Port uint32 - Bootnodes []string - ProtocolID string - NoBootstrap bool - NoMDNS bool - MinPeers int - MaxPeers int + Port uint32 + Bootnodes []string + ProtocolID string + NoBootstrap bool + NoMDNS bool + MinPeers int + MaxPeers int + PersistentPeers []string } // CoreConfig is to marshal/unmarshal toml core config vars diff --git a/dot/config/toml/config.go b/dot/config/toml/config.go index 6948bbcfe2..98a53611a3 100644 --- a/dot/config/toml/config.go +++ b/dot/config/toml/config.go @@ -61,13 +61,14 @@ type AccountConfig struct { // NetworkConfig is to marshal/unmarshal toml network config vars type NetworkConfig struct { - Port uint32 `toml:"port,omitempty"` - Bootnodes []string `toml:"bootnodes,omitempty"` - ProtocolID string `toml:"protocol,omitempty"` - NoBootstrap bool `toml:"nobootstrap,omitempty"` - NoMDNS bool `toml:"nomdns,omitempty"` - MinPeers int `toml:"min-peers,omitempty"` - MaxPeers int `toml:"max-peers,omitempty"` + Port uint32 `toml:"port,omitempty"` + Bootnodes []string `toml:"bootnodes,omitempty"` + ProtocolID string `toml:"protocol,omitempty"` + NoBootstrap bool `toml:"nobootstrap,omitempty"` + NoMDNS bool `toml:"nomdns,omitempty"` + MinPeers int `toml:"min-peers,omitempty"` + MaxPeers int `toml:"max-peers,omitempty"` + PersistentPeers []string `toml:"persistent-peers,omitempty"` } // CoreConfig is to marshal/unmarshal toml core config vars diff --git a/dot/network/config.go b/dot/network/config.go index d2420f8a7a..a3cf8e2a02 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -85,6 +85,9 @@ type Config struct { MinPeers int MaxPeers int + // PersistentPeers is a list of multiaddrs which the node should remain connected to + PersistentPeers []string + // privateKey the private key for the network p2p identity privateKey crypto.PrivKey diff --git a/dot/network/connmgr.go b/dot/network/connmgr.go index f118ae1c38..f10b2fc585 100644 --- a/dot/network/connmgr.go +++ b/dot/network/connmgr.go @@ -31,25 +31,29 @@ import ( // ConnManager implements connmgr.ConnManager type ConnManager struct { + sync.Mutex + host *host min, max int disconnectHandler func(peer.ID) // closeHandlerMap contains close handler corresponding to a protocol. closeHandlerMap map[protocol.ID]func(peerID peer.ID) - protectedPeerMapMu sync.RWMutex - // protectedPeerMap contains a list of peers that are protected from pruning + // protectedPeers contains a list of peers that are protected from pruning // when we reach the maximum numbers of peers. - protectedPeerMap map[peer.ID]struct{} - sync.Mutex + protectedPeers *sync.Map // map[peer.ID]struct{} + + // persistentPeers contains peers we should remain connected to. + persistentPeers *sync.Map // map[peer.ID]struct{} } func newConnManager(min, max int) *ConnManager { return &ConnManager{ - min: min, - max: max, - closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)), - protectedPeerMap: make(map[peer.ID]struct{}), + min: min, + max: max, + closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)), + protectedPeers: new(sync.Map), + persistentPeers: new(sync.Map), } } @@ -85,25 +89,15 @@ func (*ConnManager) TrimOpenConns(ctx context.Context) {} // Protect peer will add the given peer to the protectedPeerMap which will // protect the peer from pruning. func (cm *ConnManager) Protect(id peer.ID, tag string) { - cm.protectedPeerMapMu.Lock() - defer cm.protectedPeerMapMu.Unlock() - - cm.protectedPeerMap[id] = struct{}{} + cm.protectedPeers.Store(id, struct{}{}) } // Unprotect peer will remove the given peer from prune protection. // returns true if we have successfully removed the peer from the // protectedPeerMap. False otherwise. func (cm *ConnManager) Unprotect(id peer.ID, tag string) bool { - cm.protectedPeerMapMu.Lock() - defer cm.protectedPeerMapMu.Unlock() - - _, ok := cm.protectedPeerMap[id] - if ok { - delete(cm.protectedPeerMap, id) - return true - } - return false + _, wasDeleted := cm.protectedPeers.LoadAndDelete(id) + return wasDeleted } // Close peer @@ -111,10 +105,7 @@ func (*ConnManager) Close() error { return nil } // IsProtected returns whether the given peer is protected from pruning or not. func (cm *ConnManager) IsProtected(id peer.ID, tag string) (protected bool) { - cm.protectedPeerMapMu.RLock() - defer cm.protectedPeerMapMu.RUnlock() - - _, ok := cm.protectedPeerMap[id] + _, ok := cm.protectedPeers.Load(id) return ok } @@ -140,7 +131,7 @@ func (cm *ConnManager) ListenClose(n network.Network, addr ma.Multiaddr) { func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID { unprot := []peer.ID{} for _, id := range peers { - if !cm.IsProtected(id, "") { + if !cm.IsProtected(id, "") && !cm.isPersistent(id) { unprot = append(unprot, id) } } @@ -159,6 +150,7 @@ func (cm *ConnManager) Connected(n network.Network, c network.Conn) { cm.Lock() defer cm.Unlock() + // TODO: this should be updated to disconnect from (total_peers - maximum) peers, instead of just one peer if len(n.Peers()) > cm.max { unprotPeers := cm.unprotectedPeers(n.Peers()) if len(unprotPeers) == 0 { @@ -188,6 +180,22 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) { if cm.disconnectHandler != nil { cm.disconnectHandler(c.RemotePeer()) } + + if !cm.isPersistent(c.RemotePeer()) { + return + } + + addrs := cm.host.h.Peerstore().Addrs(c.RemotePeer()) + info := peer.AddrInfo{ + ID: c.RemotePeer(), + Addrs: addrs, + } + + err := cm.host.connect(info) + if err != nil { + logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err) + } + // TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers } @@ -224,3 +232,8 @@ func (cm *ConnManager) ClosedStream(n network.Network, s network.Stream) { closeCB(s.Conn().RemotePeer()) } } + +func (cm *ConnManager) isPersistent(p peer.ID) bool { + _, ok := cm.persistentPeers.Load(p) + return ok +} diff --git a/dot/network/connmgr_test.go b/dot/network/connmgr_test.go index e6112735fb..e760704013 100644 --- a/dot/network/connmgr_test.go +++ b/dot/network/connmgr_test.go @@ -19,6 +19,7 @@ package network import ( "fmt" "testing" + "time" "github.com/ChainSafe/gossamer/lib/utils" "github.com/libp2p/go-libp2p-core/peer" @@ -64,7 +65,6 @@ func TestMaxPeers(t *testing.T) { func TestProtectUnprotectPeer(t *testing.T) { cm := newConnManager(1, 4) - require.Zero(t, len(cm.protectedPeerMap)) p1 := peer.ID("a") p2 := peer.ID("b") @@ -86,3 +86,34 @@ func TestProtectUnprotectPeer(t *testing.T) { unprot = cm.unprotectedPeers([]peer.ID{p1, p2, p3, p4}) require.Equal(t, unprot, []peer.ID{p1, p2, p3, p4}) } + +func TestPersistentPeers(t *testing.T) { + configA := &Config{ + BasePath: utils.NewTestBasePath(t, "node-a"), + Port: 7000, + RandSeed: 1, + NoBootstrap: true, + NoMDNS: true, + } + nodeA := createTestService(t, configA) + + addrs := nodeA.host.multiaddrs() + configB := &Config{ + BasePath: utils.NewTestBasePath(t, "node-b"), + Port: 7001, + RandSeed: 2, + NoMDNS: true, + PersistentPeers: []string{addrs[0].String()}, + } + nodeB := createTestService(t, configB) + + // B should have connected to A during bootstrap + conns := nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) + require.NotEqual(t, 0, len(conns)) + + // if A disconnects from B, B should reconnect + nodeA.host.h.Network().ClosePeer(nodeA.host.id()) + time.Sleep(time.Millisecond * 500) + conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) + require.NotEqual(t, 0, len(conns)) +} diff --git a/dot/network/host.go b/dot/network/host.go index 79594760e4..8b1e4d25da 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -49,13 +49,14 @@ var privateCIDRs = []string{ // host wraps libp2p host with network host configuration and services type host struct { - ctx context.Context - h libp2phost.Host - dht *dual.DHT - bootnodes []peer.AddrInfo - protocolID protocol.ID - cm *ConnManager - ds *badger.Datastore + ctx context.Context + h libp2phost.Host + dht *dual.DHT + bootnodes []peer.AddrInfo + persistentPeers []peer.AddrInfo + protocolID protocol.ID + cm *ConnManager + ds *badger.Datastore } // newHost creates a host wrapper with a new libp2p host instance @@ -78,6 +79,16 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { return nil, err } + // format persistent peers + pps, err := stringsToAddrInfos(cfg.PersistentPeers) + if err != nil { + return nil, err + } + + for _, pp := range pps { + cm.persistentPeers.Store(pp.ID, struct{}{}) + } + // format protocol id pid := protocol.ID(cfg.ProtocolID) @@ -143,16 +154,19 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { // wrap host and DHT service with routed host h = rhost.Wrap(h, dht) - return &host{ - ctx: ctx, - h: h, - dht: dht, - bootnodes: bns, - protocolID: pid, - cm: cm, - ds: ds, - }, nil + host := &host{ + ctx: ctx, + h: h, + dht: dht, + bootnodes: bns, + protocolID: pid, + cm: cm, + ds: ds, + persistentPeers: pps, + } + cm.host = host + return host, nil } // close closes host services and the libp2p host (host services first) @@ -221,14 +235,15 @@ func (h *host) addToPeerstore(p peer.AddrInfo) { // bootstrap connects the host to the configured bootnodes func (h *host) bootstrap() { failed := 0 - for _, addrInfo := range h.bootnodes { + all := append(h.bootnodes, h.persistentPeers...) + for _, addrInfo := range all { err := h.connect(addrInfo) if err != nil { logger.Debug("failed to bootstrap to peer", "error", err) failed++ } } - if failed == len(h.bootnodes) { + if failed == len(all) { logger.Error("failed to bootstrap to any bootnode") } } diff --git a/dot/services.go b/dot/services.go index a49239cb52..0f5d329dbc 100644 --- a/dot/services.go +++ b/dot/services.go @@ -263,18 +263,19 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi // network service configuation networkConfig := network.Config{ - LogLvl: cfg.Log.NetworkLvl, - BlockState: stateSrvc.Block, - BasePath: cfg.Global.BasePath, - Roles: cfg.Core.Roles, - Port: cfg.Network.Port, - Bootnodes: cfg.Network.Bootnodes, - ProtocolID: cfg.Network.ProtocolID, - NoBootstrap: cfg.Network.NoBootstrap, - NoMDNS: cfg.Network.NoMDNS, - MinPeers: cfg.Network.MinPeers, - MaxPeers: cfg.Network.MaxPeers, - PublishMetrics: cfg.Global.PublishMetrics, + LogLvl: cfg.Log.NetworkLvl, + BlockState: stateSrvc.Block, + BasePath: cfg.Global.BasePath, + Roles: cfg.Core.Roles, + Port: cfg.Network.Port, + Bootnodes: cfg.Network.Bootnodes, + ProtocolID: cfg.Network.ProtocolID, + NoBootstrap: cfg.Network.NoBootstrap, + NoMDNS: cfg.Network.NoMDNS, + MinPeers: cfg.Network.MinPeers, + MaxPeers: cfg.Network.MaxPeers, + PublishMetrics: cfg.Global.PublishMetrics, + PersistentPeers: cfg.Network.PersistentPeers, } networkSrvc, err := network.NewService(&networkConfig) diff --git a/go.mod b/go.mod index a38499f774..e9ee6e8434 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect - golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd + golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect google.golang.org/appengine v1.6.5 // indirect google.golang.org/protobuf v1.25.0 )