Skip to content

Commit

Permalink
Merge pull request #180 from koinos/bump-p2p
Browse files Browse the repository at this point in the history
Bump p2p
  • Loading branch information
sgerbino authored Nov 2, 2021
2 parents c5c673d + e4cca77 commit 9cac550
Show file tree
Hide file tree
Showing 9 changed files with 674 additions and 301 deletions.
29 changes: 12 additions & 17 deletions cmd/koinos-p2p/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@ import (
)

const (
baseDirOption = "basedir"
amqpOption = "amqp"
listenOption = "listen"
seedOption = "seed"
peerOption = "peer"
directOption = "direct"
checkpointOption = "checkpoint"
peerExchangeOption = "pex"
gossipOption = "gossip"
forceGossipOption = "force-gossip"
logLevelOption = "log-level"
instanceIDOption = "instance-id"
baseDirOption = "basedir"
amqpOption = "amqp"
listenOption = "listen"
seedOption = "seed"
peerOption = "peer"
directOption = "direct"
checkpointOption = "checkpoint"
gossipOption = "gossip"
forceGossipOption = "force-gossip"
logLevelOption = "log-level"
instanceIDOption = "instance-id"
)

const (
Expand Down Expand Up @@ -75,10 +74,9 @@ func main() {
peerAddresses := flag.StringSliceP(peerOption, "p", []string{}, "Address of a peer to which to connect (may specify multiple)")
directAddresses := flag.StringSliceP(directOption, "D", []string{}, "Address of a peer to connect using gossipsub.WithDirectPeers (may specify multiple) (should be reciprocal)")
checkpoints := flag.StringSliceP(checkpointOption, "c", []string{}, "Block checkpoint in the form height:blockid (may specify multiple times)")
peerExchange := flag.BoolP(peerExchangeOption, "x", peerExchangeDefault, "Exchange peers with other nodes")
gossip := flag.BoolP(gossipOption, "g", gossipDefault, "Enable gossip mode")
forceGossip := flag.BoolP(forceGossipOption, "G", forceGossipDefault, "Force gossip mode")
logLevel := flag.StringP(logLevelOption, "v", logLevelDefault, "The log filtering level (debug, info, warn, error)")
logLevel := flag.StringP(logLevelOption, "v", "", "The log filtering level (debug, info, warn, error)")
instanceID := flag.StringP(instanceIDOption, "i", instanceIDDefault, "The instance ID to identify this node")

flag.Parse()
Expand All @@ -93,7 +91,6 @@ func main() {
*peerAddresses = util.GetStringSliceOption(peerOption, *peerAddresses, yamlConfig.P2P, yamlConfig.Global)
*directAddresses = util.GetStringSliceOption(directOption, *directAddresses, yamlConfig.P2P, yamlConfig.Global)
*checkpoints = util.GetStringSliceOption(checkpointOption, *checkpoints, yamlConfig.P2P, yamlConfig.Global)
*peerExchange = util.GetBoolOption(peerExchangeOption, peerExchangeDefault, *peerExchange, yamlConfig.P2P, yamlConfig.Global)
*gossip = util.GetBoolOption(gossipOption, *gossip, gossipDefault, yamlConfig.P2P, yamlConfig.Global, yamlConfig.Global)
*forceGossip = util.GetBoolOption(forceGossipOption, *forceGossip, forceGossipDefault, yamlConfig.P2P, yamlConfig.Global)
*logLevel = util.GetStringOption(logLevelOption, logLevelDefault, *logLevel, yamlConfig.P2P, yamlConfig.Global)
Expand All @@ -113,8 +110,6 @@ func main() {

config := options.NewConfig()

config.NodeOptions.EnablePeerExchange = *peerExchange

config.NodeOptions.InitialPeers = *peerAddresses
config.NodeOptions.DirectPeers = *directAddresses

Expand Down
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ module github.com/koinos/koinos-p2p
go 1.15

require (
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log v1.0.5
github.com/koinos/koinos-log-golang v0.0.0-20210621202301-3310a8e5866b
github.com/koinos/koinos-mq-golang v0.0.0-20210424202816-d2bd4d1894d1
github.com/koinos/koinos-proto-golang v0.0.0-20210914170258-3625b3f80c90
github.com/koinos/koinos-util-golang v0.0.0-20211019222021-3b7f67a3119d
github.com/libp2p/go-libp2p v0.14.2
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-gorpc v0.1.2
github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multihash v0.0.16
github.com/libp2p/go-libp2p v0.15.1
github.com/libp2p/go-libp2p-core v0.9.0
github.com/libp2p/go-libp2p-gorpc v0.1.3
github.com/libp2p/go-libp2p-kad-dht v0.15.0
github.com/libp2p/go-libp2p-pubsub v0.5.6
github.com/multiformats/go-multiaddr v0.4.0
github.com/multiformats/go-multihash v0.0.15
github.com/spf13/pflag v1.0.5
google.golang.org/protobuf v1.27.1
)
565 changes: 517 additions & 48 deletions go.sum

Large diffs are not rendered by default.

92 changes: 61 additions & 31 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"math/rand"
"sync/atomic"
"time"

log "github.com/koinos/koinos-log-golang"
koinosmq "github.com/koinos/koinos-mq-golang"
Expand All @@ -23,6 +24,8 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
multiaddr "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -58,18 +61,50 @@ func NewKoinosP2PNode(ctx context.Context, listenAddr string, localRPC rpc.Local
return nil, err
}

node := new(KoinosP2PNode)

node.Options = config.NodeOptions
node.PeerErrorChan = make(chan p2p.PeerError)
node.DisconnectPeerChan = make(chan peer.ID)
node.GossipVoteChan = make(chan p2p.GossipVote)
node.PeerDisconnectedChan = make(chan peer.ID)

node.PeerErrorHandler = p2p.NewPeerErrorHandler(
node.DisconnectPeerChan,
node.PeerErrorChan,
config.PeerErrorHandlerOptions)

var idht *dht.IpfsDHT

options := []libp2p.Option{
libp2p.ListenAddrStrings(listenAddr),
libp2p.Identity(privateKey),
// Attempt to open ports using uPNP for NATed hosts.
libp2p.NATPortMap(),
// Let this host use the DHT to find other hosts
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
idht, err = dht.New(ctx, h)
return idht, err
}),
// Let this host use relays and advertise itself on relays if
// it finds it is behind NAT. Use libp2p.Relay(options...) to
// enable active relays and more.
libp2p.EnableAutoRelay(),
// If you want to help other peers to figure out if they are behind
// NATs, you can launch the server-side of AutoNAT too (AutoRelay
// already runs the client)
//
// This service is highly rate-limited and should not cause any
// performance issues.
libp2p.EnableNATService(),
libp2p.ConnectionGater(node.PeerErrorHandler),
}

host, err := libp2p.New(ctx, options...)
if err != nil {
return nil, err
}

node := new(KoinosP2PNode)
node.Host = host
node.localRPC = localRPC

Expand All @@ -81,28 +116,11 @@ func NewKoinosP2PNode(ctx context.Context, listenAddr string, localRPC rpc.Local
log.Info("Starting P2P node without broadcast listeners")
}

node.Options = config.NodeOptions
node.PeerErrorChan = make(chan p2p.PeerError)
node.DisconnectPeerChan = make(chan peer.ID)
node.GossipVoteChan = make(chan p2p.GossipVote)
node.PeerDisconnectedChan = make(chan peer.ID)

// Create the pubsub gossip
pubsub.GossipSubD = 6
pubsub.GossipSubDlo = 5
pubsub.GossipSubDhi = 12
pubsub.GossipSubDscore = 4

if !node.Options.EnablePeerExchange {
pubsub.GossipSubPrunePeers = 0
} else {
pubsub.GossipSubPrunePeers = 16
}

pubsub.TimeCacheDuration = 60 * time.Second
ps, err := pubsub.NewGossipSub(
ctx, node.Host,
pubsub.WithPeerExchange(node.Options.EnablePeerExchange),
pubsub.WithMessageIdFn(generateMessageID),
pubsub.WithPeerExchange(true),
)
if err != nil {
return nil, err
Expand All @@ -113,15 +131,9 @@ func NewKoinosP2PNode(ctx context.Context, listenAddr string, localRPC rpc.Local
node.localRPC,
ps,
node.PeerErrorChan,
node,
node.Host.ID(),
node)

node.PeerErrorHandler = p2p.NewPeerErrorHandler(
node.DisconnectPeerChan,
node.PeerErrorChan,
config.PeerErrorHandlerOptions)

node.GossipToggle = p2p.NewGossipToggle(
node.Gossip,
node.GossipVoteChan,
Expand All @@ -130,8 +142,6 @@ func NewKoinosP2PNode(ctx context.Context, listenAddr string, localRPC rpc.Local

node.ConnectionManager = p2p.NewConnectionManager(
node.Host,
node.Gossip,
node.PeerErrorHandler,
node.localRPC,
&config.PeerConnectionOptions,
node,
Expand Down Expand Up @@ -205,7 +215,7 @@ func (n *KoinosP2PNode) PeerStringToAddress(peerAddr string) (*peer.AddrInfo, er

// ConnectToPeerAddress connects to the given peer address
func (n *KoinosP2PNode) ConnectToPeerAddress(ctx context.Context, peer *peer.AddrInfo) error {
return n.ConnectionManager.ConnectToPeer(ctx, peer)
return n.Host.Connect(ctx, *peer)
}

// GetConnections returns the host's current peer connections
Expand Down Expand Up @@ -241,6 +251,26 @@ func (n *KoinosP2PNode) Close() error {
return nil
}

func (n *KoinosP2PNode) logConnectionsLoop(ctx context.Context) {
for {
select {
case <-time.After(time.Minute * 1):
log.Info("My address:")
log.Infof(" - %s", n.GetAddress())
log.Info("Connected peers:")
for i, conn := range n.GetConnections() {
log.Infof(" - %s/p2p%s", conn.RemoteMultiaddr(), conn.RemotePeer())
if i > 10 {
log.Infof(" and %v more...", len(n.GetConnections())-i)
break
}
}
case <-ctx.Done():
return
}
}
}

// Start starts background goroutines
func (n *KoinosP2PNode) Start(ctx context.Context) {
n.Host.Network().Notify(n.ConnectionManager)
Expand All @@ -254,7 +284,7 @@ func (n *KoinosP2PNode) Start(ctx context.Context) {
n.libValue.Store(*forkHeads.LastIrreversibleBlock)

// Start peer gossip
n.Gossip.StartPeerGossip(ctx)
go n.logConnectionsLoop(ctx)
n.PeerErrorHandler.Start(ctx)
n.GossipToggle.Start(ctx)
n.ConnectionManager.Start(ctx)
Expand Down Expand Up @@ -308,7 +338,7 @@ func generatePrivateKey(seed string) (crypto.PrivKey, error) {
func generateMessageID(msg *pb.Message) string {
// Use the default unique ID function for peer exchange
switch *msg.Topic {
case p2p.BlockTopicName, p2p.TransactionTopicName, p2p.PeerTopicName:
case p2p.BlockTopicName, p2p.TransactionTopicName:
// Hash the data
h := sha256.New()
h.Write(msg.Data)
Expand Down
10 changes: 3 additions & 7 deletions internal/options/node_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package options

// NodeOptions is options that affect the whole node
type NodeOptions struct {
// Set to true to enable peer exchange, where peers are given to / accepted from other nodes
EnablePeerExchange bool

// Peers to initially connect
InitialPeers []string

Expand All @@ -18,9 +15,8 @@ type NodeOptions struct {
// NewNodeOptions creates a NodeOptions object which controls how p2p works
func NewNodeOptions() *NodeOptions {
return &NodeOptions{
EnablePeerExchange: true,
InitialPeers: make([]string, 0),
DirectPeers: make([]string, 0),
ForceGossip: false,
InitialPeers: make([]string, 0),
DirectPeers: make([]string, 0),
ForceGossip: false,
}
}
Loading

0 comments on commit 9cac550

Please sign in to comment.