Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: p2p package overview #6096

Merged
merged 6 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@
}

network.SetUserAgentHeader(request.Header)
return peer.GetHTTPClient().Do(request)
httpClient := peer.GetHTTPClient()
if httpClient == nil {
return nil, fmt.Errorf("requestLedger: HTTPPeer %s has no http client", peer.GetAddress())

Check warning on line 86 in catchup/ledgerFetcher.go

View check run for this annotation

Codecov / codecov/patch

catchup/ledgerFetcher.go#L86

Added line #L86 was not covered by tests
}
return httpClient.Do(request)
}

func (lf *ledgerFetcher) headLedger(ctx context.Context, peer network.Peer, round basics.Round) error {
Expand Down
6 changes: 5 additions & 1 deletion catchup/universalFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@
}
address = fetcherClient.address()
} else if httpPeer, validHTTPPeer := peer.(network.HTTPPeer); validHTTPPeer {
httpClient := httpPeer.GetHTTPClient()
if httpClient == nil {
return nil, nil, time.Duration(0), fmt.Errorf("fetchBlock: HTTPPeer %s has no http client", httpPeer.GetAddress())

Check warning on line 74 in catchup/universalFetcher.go

View check run for this annotation

Codecov / codecov/patch

catchup/universalFetcher.go#L74

Added line #L74 was not covered by tests
}
fetcherClient := &HTTPFetcher{
peer: httpPeer,
rootURL: httpPeer.GetAddress(),
net: uf.net,
client: httpPeer.GetHTTPClient(),
client: httpClient,
log: uf.log,
config: &uf.config}
fetchedBuf, err = fetcherClient.getBlockBytes(ctx, round)
Expand Down
149 changes: 149 additions & 0 deletions network/README-P2P.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# P2P Network implementation overview

Refer to [p2p sub-package overview](./p2p/README.md) for details about p2p sub-components.

`P2PNetwork` implements the `GossipNode` interface similarly to `WsNetwork`. Both use
the same peer connection management and message broadcast functions but different
transport: lip2p-managed connections and HTTP + WebSocket, respectively.
`P2PNetwork` and `WsNetwork` require `config.NetAddress` to be set in order to start a server.

In addition, `HybridNetwork` is an aggregate of `P2PNetwork` and `WsNetwork` allowing a node
to interact over both networks. In the case of hybrid operation, both `config.P2PNetAddress` and
`config.NetAddress` are used.

## General design

`P2PNetwork` follows the `WsNetwork` approach for peers management and message handling:
- `msgHandler` used process or route the network protocol messages to external handlers
(for example, transaction handler or agreement service)
- `broadcaster` implementing the broadcast functionality (see below)
- mesh thread to maintain `GossipFanout` number of outgoing peers
- HTTP Server for external HTTP services (block, catchpoints)
- `OnNetworkAdvance` listener to react on round advancing

A key difference is that `P2PNetwork` uses `go-libp2p-pubsub` for TX message handling.
Upon start it subscribes to `/algo/tx/0.1.0` topic and publishes TX messages as needed.
The `pubsub` library divides message handling into two stages: validation and processing. Based on
the validation result, a message is either discarded or accepted for further
broadcasting to other peers. This necessitates having separate handlers for TX messages
in `TxHandler`, as we must synchronously determine whether a transaction group is valid:
- can't ignore fast and broadcast later - will be rejected as a seen message
- can't accept fast to prevent invalid/expired transactions broadcasting

## Major Components

### HTTP Services

`P2PNetwork` uses libp2p's `http` submodule to handle HTTP traffic over libp2p-managed connection.
It is `http.Handler`-compatible so that service handlers are registered the same way as for `WsNetwork`.

### Phonebook and Peerstore and peer classes

Originally phonebook was designed as an address registry holding permanent (`-p` cli option
or `phonebook.json` extra configuration file) and dynamic (SRV DNS records) entries.
These entries later can be later retrieved by a peer role
(`PhoneBookEntryRelayRole` or `PhoneBookEntryArchivalRole`).
A new `PeerStore` (built on top of `libp2p.Peerstore`) resembles the original `Phonebook`
by strictly implementing some of its methods and has the remaining `Phonebook`'s methods
with a slightly different signature - `string` vs `peer.AddrInfo` for address representation.
The main issue with that entries in `PeerStore` are identified by `PeerID`
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
and each peer might have multiple addresses (versus the original WS peers with the only one
`host:port` connectivity option.)

Both P2PNetwork and WsNetwork have an extra level of peer classification on top of two phonebook's
classes: `PeersConnectedOut`, `PeersConnectedIn`, `PeersPhonebookRelays`, `PeersPhonebookArchivalNodes`.
This allows network clients to be more precise on peers set they want to work with. For example,
ledger service wants `PeersPhonebookArchivalNodes`, and transaction syncer - `PeersConnectedOut`.


### wsPeer

Peers are created in `wsStreamHandler` that is called for both incoming and outgoing connections
(and streams). `incoming` flag is set to true for incoming connection.
At the very beginning of the `wsStreamHandler` one byte read/write happens in order to make sure:
- Stream is operable
- A placeholder for a handshake where some meta-data can be exchanged

Each peer gets a read channel `handler.readBuffer` where it enqueues incoming messages for routing
to appropriate handler.

Connected peers are maintained as a `wsPeers` map similarly to the `WsNetwork`.
The main difference between `P2PNetwork` and `WsNetwork` is `http.Client`. Because wsPeers operate
over the multiplexed streams in libp2p-managed connection, a plain `http.Client` would not be able
to connect to p2p HTTP server. This requires `wsPeer` constructed in `P2PNetwork` to have a special
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
libp2p-streams compatible `http.Client` produced by `MakeHTTPClientWithRateLimit` helper method.
It implement rate-limiting approach similar to the regular http clients from `WsNetwork`.
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved

### Broadcaster

`msgBroadcaster` encapsulates a shared broadcasting logic: priority vs bulk messages (and queues),
data preparation, peers retrieving. Broadcast requests eventually hits
`peer.writeNonBlockMsgs` -> `peer.writeLoopSendMsg` -> `conn.WriteMessage`.
See the diagram denoting the broadcast data flow.

```mermaid
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
graph LR

p2pnet[P2PNetwork]
wsnet[WsNetwork]
B[broadcaster]

p2pnet & wsnet --> B

subgraph "wsPeer"
direction LR
writeNonBlockMsgs
Conn[conn.WriteMessage]

subgraph "writeLoop"
writeLoopSendMsg
end

writeNonBlockMsgs --> writeLoop
writeLoopSendMsg --> Conn
end

B --> writeNonBlockMsgs

Conn --> WMP2P & WMWS

subgraph "wsPeerConnP2P"
WMP2P[WriteMessage]
end

subgraph "websocket"
WMWS[WriteMessage]
end

subgraph "libp2p"
stream.Write
end

WMP2P --> libp2p
```

### DHT and Capabilities discovery

DHT is controlled by the `EnableDHTProviders` configuration option and the capabilities
exposed by a node. These capabilities include:
- `archival`: a listening node with `Archival` config flag set
- `catchpointStoring`: a listening node configured to store catchpoints
- `gossip`: a listening node with `EnableGossipService` config flag set

When the `P2PNetwork` starts, the node begins advertising its capabilities by running
a background goroutine. By default, the DHT implementation pulls bootstrap nodes from
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
a peer store and attempts to connect immediately, which is not how go-algorand services operate.
To address this, a new `bootstrapper` abstraction has been added to control bootstrap peer
access using the DHT's `BootstrapFunc` mechanism. The callback function returns empty bootstrap
peers until the `P2PNetwork` starts.

### Net identity based peers deduplication

`WsNetwork` net identity was slightly extended to allow ws and p2p nodes cross-check
when running in a hybrid mode:
- `identityTracker` instance is shared between `WsNetwork` and `P2PNetwork`
- identity schema supplied to the `WsNetwork` uses a p2p-node private key based message signer
- `PublicAddress` must be set for hybrid nodes in order to operate properly

Using the changes above `identityTracker` is able to deduplicate `WsNetwork` peer if it ends up
to be hybrid node already connected to via `P2PNetwork` and other way around.
21 changes: 9 additions & 12 deletions network/limitcaller/rateLimitingTransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,16 @@ var ErrConnectionQueueingTimeout = errors.New("rateLimitingTransport: queueing t
// according to the entries in the phonebook.
func MakeRateLimitingTransport(phonebook ConnectionTimeStore, queueingTimeout time.Duration, dialer *Dialer, maxIdleConnsPerHost int) RateLimitingTransport {
defaultTransport := http.DefaultTransport.(*http.Transport)
return RateLimitingTransport{
phonebook: phonebook,
innerTransport: &http.Transport{
Proxy: defaultTransport.Proxy,
DialContext: dialer.innerDialContext,
MaxIdleConns: defaultTransport.MaxIdleConns,
IdleConnTimeout: defaultTransport.IdleConnTimeout,
TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout,
ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
},
queueingTimeout: queueingTimeout,
innerTransport := &http.Transport{
Proxy: defaultTransport.Proxy,
DialContext: dialer.innerDialContext,
MaxIdleConns: defaultTransport.MaxIdleConns,
IdleConnTimeout: defaultTransport.IdleConnTimeout,
TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout,
ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
}
return MakeRateLimitingTransportWithRoundTripper(phonebook, queueingTimeout, innerTransport, nil, maxIdleConnsPerHost)
}

// MakeRateLimitingTransportWithRoundTripper creates a rate limiting http transport that would limit the requests rate
Expand Down
88 changes: 85 additions & 3 deletions network/p2p/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Libp2p also provides an implementation of a message-based gossip protocol, Gossi

Algorand's current network protocol sends messages between peers over bidirectional
WebSocket connections. Nodes that are configured to enable message-forwarding (including
nodes currently called "relays") validate incoming messages, then selectively forward
nodes currently called "relays") validate incoming messages, then selectively forward
messages to other connected peers. This network implementation (`WebsocketNetwork`) sits
behind the `GossipNode` interface in the network package.

Expand All @@ -36,8 +36,8 @@ via peer connections managed by libp2p. The `P2PNetwork` implementation uses
and [peer IDs](https://docs.libp2p.io/concepts/fundamentals/peers/#peer-ids-in-multiaddrs)
to establish connections and identify peers.

Currently transactions (protocol tag `TX`) are distributed using the GossipSub protocol,
while all other messages are forwarded over a custom message protocol `/algorand-ws/1.0.0`
Currently transactions (protocol tag `TX`) are distributed using the GossipSub protocol (see [pubsub.go](./pubsub.go)),
while all other messages are forwarded over a custom message protocol `/algorand-ws/1.0.0` (see [streams.go](./streams.go))
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
that uses the same message serialization as the existing `WebsocketNetwork` implementation.
These two protocols are multiplexed over a single connection using libp2p streams.

Expand All @@ -63,3 +63,85 @@ graph LR
AW --> WS
S --> T
```

The underlying libp2p implementation abstracted as `p2p.Service` and initialized in two steps:
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
1. Creating a p2p `Host`
2. Creating a service `serviceImpl` object

`Host` is also used for p2p HTTP server and DHT Discovery service creation. It is also useful for unit testing. Note, `Host` is created with `NoListenAddrs` options that prevents automatic listening and networking until the `Service.Start()` is called. This follows others services design (including WsNetwork service).
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved

### Connection limiting

libp2p's `ResourceManager` is used to limit number of connections up tp `cfg.P2PIncomingConnectionsLimit`.
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved

### DHT and capabilities

Provides helper methods to construct DHT discovery service using `go-libp2p-kad-dht` library.
High level [CapabilitiesDiscovery](./capabilities.go) class supports retrieving (`PeersForCapability`)
peers by a given capability(-ies) or advertising own capabilities (`AdvertiseCapabilities`).

Note, by default private and non-routable addresses are filtered (see `AddrsFactory`),
libp2p's `ObservedAddrManager` can track own public address and makes it available
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
(and so that discoverable with DHT) if it was observed at least 4 times in 30 minutes (as [email protected]).
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved

```mermaid
graph LR

subgraph "node"
Cap[Capabilities]
end

subgraph "P2P Implementation"
P2P[P2PNetwork]
AdvCap[AdvertiseCapabilities]
end

P2P --> AdvCap
Cap -.-> P2P

subgraph "libp2p"
Adv[Advertise]
Addr[Addrs]
OAM[ObservedAddrManager]
AF[AddrFactory]
KAD["/kad/1.0.0"]
end

OAM -.-> Addr
AF -.-> Addr
AdvCap --> Adv

subgraph "libp2p-kad-dht"
Pro[Provide]
end

Addr -.-> Pro
Adv --> Pro
Pro --> KAD
```

### HTTP over libp2p connection

[email protected] added ability to multiplex HTTP traffic in p2p connection.
A custom `/algorand-http/1.0.0` stream is utilized to expose HTTP server and allow
network service clients (catchup, catchpoint, txsync) to register its own handlers
similarly to the legacy ws-net implementation.

### Peerstore

In-memory peerstore implements `libp2p.Peerstore` and go-algorand `Phonebook` interfaces.
Peer classes (relays, archival, etc) and persistent peers (i.e. peers from command line or phonebook.json)
are supported. Possible enhancement is to save/load peerstore to/from disk to tolerate bootstrap nodes failures.

### Logging

lip2p uses zap logger as a separate `ipfs/go-log/v2` module. `EnableP2PLogging` helper adds
go-algorand's `logrus` as a custom zap core so that all libp2p logs go through go-algorand logging facility.
Unfortunately `ipfs/go-log/v2` has a primary logging core as module variable that makes impossible
to have custom `logrus` sub-loggers in unit tests.

### Metrics

`go-libp2p` uses Prometheus as a metrics library, `go-libp2p-kad-dht` relies on OpenCensus library.
go-algorand has two collectors (see `util/metrics`) for both Prometheus and OpenCensus for
counters and gauges with labels. Other types (summary, histogram, distribution) are not supported at the moment.
16 changes: 8 additions & 8 deletions network/p2p/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ type CapabilitiesDiscovery struct {
wg sync.WaitGroup
}

// Advertise implements the discovery.Discovery/discovery.Advertiser interface
func (c *CapabilitiesDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
// advertise implements the discovery.Discovery/discovery.Advertiser interface
func (c *CapabilitiesDiscovery) advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
return c.disc.Advertise(ctx, ns, opts...)
}

// FindPeers implements the discovery.Discovery/discovery.Discoverer interface
func (c *CapabilitiesDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
// findPeers implements the discovery.Discovery/discovery.Discoverer interface
func (c *CapabilitiesDiscovery) findPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
return c.disc.FindPeers(ctx, ns, opts...)
}

Expand All @@ -78,8 +78,8 @@ func (c *CapabilitiesDiscovery) Host() host.Host {
return c.dht.Host()
}

// AddPeer adds a given peer.AddrInfo to the Host's Peerstore, and the DHT's routing table
func (c *CapabilitiesDiscovery) AddPeer(p peer.AddrInfo) (bool, error) {
// addPeer adds a given peer.AddrInfo to the Host's Peerstore, and the DHT's routing table
func (c *CapabilitiesDiscovery) addPeer(p peer.AddrInfo) (bool, error) {
c.Host().Peerstore().AddAddrs(p.ID, p.Addrs, libpeerstore.AddressTTL)
return c.dht.RoutingTable().TryAddPeer(p.ID, true, true)
}
Expand All @@ -93,7 +93,7 @@ func (c *CapabilitiesDiscovery) PeersForCapability(capability Capability, n int)
var peers []peer.AddrInfo
// +1 because it can include self but we exclude self from the returned list
// that might confuse the caller (and tests assertions)
peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n+1))
peersChan, err := c.findPeers(ctx, string(capability), discovery.Limit(n+1))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (c *CapabilitiesDiscovery) AdvertiseCapabilities(capabilities ...Capability
var err error
advertisementInterval := maxAdvertisementInterval
for _, capa := range capabilities {
ttl, err0 := c.Advertise(c.dht.Context(), string(capa))
ttl, err0 := c.advertise(c.dht.Context(), string(capa))
if err0 != nil {
err = err0
c.log.Errorf("failed to advertise for capability %s: %v", capa, err0)
Expand Down
2 changes: 1 addition & 1 deletion network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestCapabilities_Discovery(t *testing.T) {
for _, capD := range caps {
peersAdded := 0
for _, addr := range addrs {
added, err := capD.AddPeer(addr)
added, err := capD.addPeer(addr)
require.NoError(t, err)
require.True(t, added)
peersAdded++
Expand Down
7 changes: 3 additions & 4 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
IDSigner() *PeerIDChallengeSigner
AddrInfo() peer.AddrInfo // return addrInfo for self

DialNode(context.Context, *peer.AddrInfo) error
DialPeersUntilTargetCount(targetConnCount int)
ClosePeer(peer.ID) error

Expand Down Expand Up @@ -257,15 +256,15 @@
if len(s.host.Network().ConnsToPeer(peerInfo.ID)) > 0 {
continue
}
err := s.DialNode(context.Background(), peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout
err := s.dialNode(context.Background(), peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout

Check warning on line 259 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L259

Added line #L259 was not covered by tests
if err != nil {
s.log.Warnf("failed to connect to peer %s: %v", peerInfo.ID, err)
}
}
}

// DialNode attempts to establish a connection to the provided peer
func (s *serviceImpl) DialNode(ctx context.Context, peer *peer.AddrInfo) error {
// dialNode attempts to establish a connection to the provided peer
func (s *serviceImpl) dialNode(ctx context.Context, peer *peer.AddrInfo) error {

Check warning on line 267 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L267

Added line #L267 was not covered by tests
// don't try connecting to ourselves
if peer.ID == s.host.ID() {
return nil
Expand Down
Loading
Loading