Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: hybrid networking #5800

Merged
merged 8 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ type NetworkFacade struct {
rand *rand.Rand
timeoutAtInitOnce sync.Once
timeoutAtInitWait sync.WaitGroup
peerToNode map[network.Peer]int
peerToNode map[*facadePeer]int
}

type facadePeer struct {
id int
net network.GossipNode
}

func (p *facadePeer) GetNetwork() network.GossipNode { return p.net }

// MakeNetworkFacade creates a facade with a given nodeID.
func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
n := &NetworkFacade{
Expand All @@ -83,12 +90,12 @@ func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
eventsQueues: make(map[string]int),
eventsQueuesCh: make(chan int, 1000),
rand: rand.New(rand.NewSource(int64(nodeID))),
peerToNode: make(map[network.Peer]int, fuzzer.nodesCount),
peerToNode: make(map[*facadePeer]int, fuzzer.nodesCount),
debugMessages: false,
}
n.timeoutAtInitWait.Add(1)
for i := 0; i < fuzzer.nodesCount; i++ {
n.peerToNode[network.Peer(new(int))] = i
n.peerToNode[&facadePeer{id: i, net: n}] = i
}
return n
}
Expand Down Expand Up @@ -179,7 +186,7 @@ func (n *NetworkFacade) WaitForEventsQueue(cleared bool) {
func (n *NetworkFacade) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, exclude network.Peer) error {
excludeNode := -1
if exclude != nil {
excludeNode = n.peerToNode[exclude]
excludeNode = n.peerToNode[exclude.(*facadePeer)]
}
return n.broadcast(tag, data, excludeNode, "NetworkFacade service-%v Broadcast %v %v\n")
}
Expand Down Expand Up @@ -341,8 +348,8 @@ func (n *NetworkFacade) ReceiveMessage(sourceNode int, tag protocol.Tag, data []
n.pushPendingReceivedMessage()
}

func (n *NetworkFacade) Disconnect(sender network.Peer) {
sourceNode := n.peerToNode[sender]
func (n *NetworkFacade) Disconnect(sender network.DisconnectablePeer) {
sourceNode := n.peerToNode[sender.(*facadePeer)]
n.fuzzer.Disconnect(n.nodeID, sourceNode)
}

Expand Down
2 changes: 1 addition & 1 deletion agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (w *whiteholeNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b
func (w *whiteholeNetwork) BroadcastSimple(tag protocol.Tag, data []byte) error {
return w.Broadcast(context.Background(), tag, data, true, nil)
}
func (w *whiteholeNetwork) Disconnect(badnode network.Peer) {
func (w *whiteholeNetwork) Disconnect(badnode network.DisconnectablePeer) {
return
}
func (w *whiteholeNetwork) DisconnectPeers() {
Expand Down
11 changes: 6 additions & 5 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net"
"net/http"
"net/url"
"strings"
"testing"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -174,8 +173,8 @@ func (b *basicRPCNode) GetPeers(options ...network.PeerOption) []network.Peer {
return b.peers
}

func (b *basicRPCNode) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1)
func (b *basicRPCNode) GetGenesisID() string {
return "test genesisID"
}

type httpTestPeerSource struct {
Expand All @@ -192,8 +191,8 @@ func (s *httpTestPeerSource) RegisterHandlers(dispatch []network.TaggedMessageHa
s.dispatchHandlers = append(s.dispatchHandlers, dispatch...)
}

func (s *httpTestPeerSource) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1)
func (s *httpTestPeerSource) GetGenesisID() string {
return "test genesisID"
}

// implement network.HTTPPeer
Expand Down Expand Up @@ -239,6 +238,8 @@ func (p *testUnicastPeer) GetAddress() string {
return "test"
}

func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn }

func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) {

responseChannel := make(chan *network.Response, 1)
Expand Down
2 changes: 1 addition & 1 deletion catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (lf *ledgerFetcher) requestLedger(ctx context.Context, peer network.HTTPPee
return nil, err
}

parsedURL.Path = lf.net.SubstituteGenesisID(path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36)))
parsedURL.Path = network.SubstituteGenesisID(lf.net, path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36)))
ledgerURL := parsedURL.String()
lf.log.Debugf("ledger %s %#v peer %#v %T", method, ledgerURL, peer, peer)
request, err := http.NewRequestWithContext(ctx, method, ledgerURL, nil)
Expand Down
14 changes: 9 additions & 5 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// MockNetwork is a dummy network that doesn't do anything
type MockNetwork struct {
network.GossipNode
GenesisID string
}

// Broadcast - unused function
Expand Down Expand Up @@ -58,7 +59,7 @@ func (network *MockNetwork) RequestConnectOutgoing(replace bool, quit <-chan str
}

// Disconnect - unused function
func (network *MockNetwork) Disconnect(badpeer network.Peer) {
func (network *MockNetwork) Disconnect(badpeer network.DisconnectablePeer) {
}

// DisconnectPeers - unused function
Expand All @@ -75,7 +76,7 @@ func (network *MockNetwork) GetPeers(options ...network.PeerOption) []network.Pe
}

// GetRoundTripper -- returns the network round tripper
func (network *MockNetwork) GetRoundTripper() http.RoundTripper {
func (network *MockNetwork) GetRoundTripper(peer network.Peer) http.RoundTripper {
return http.DefaultTransport
}

Expand Down Expand Up @@ -106,7 +107,10 @@ func (network *MockNetwork) GetHTTPRequestConnection(request *http.Request) (con
return nil
}

// SubstituteGenesisID - empty implementation
func (network *MockNetwork) SubstituteGenesisID(rawURL string) string {
return rawURL
// GetGenesisID - empty implementation
func (network *MockNetwork) GetGenesisID() string {
if network.GenesisID == "" {
return "mocknet"
}
return network.GenesisID
}
6 changes: 6 additions & 0 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,12 @@ type Local struct {
// EnableP2P turns on the peer to peer network
EnableP2P bool `version[31]:"false"`

// EnableP2PHybridMode turns on both websockets and P2P networking.
EnableP2PHybridMode bool `version[31]:"false"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to v32, v31 was released a few days ago


// P2PListenAddress sets the listen address used for P2P networking, if hybrid mode is set.
P2PListenAddress string `version[31]:""`

// P2PPersistPeerID will write the private key used for the node's PeerID to the P2PPrivateKeyLocation.
// This is only used when P2PEnable is true. If P2PPrivateKey is not specified, it uses the default location.
P2PPersistPeerID bool `version[29]:"false"`
Expand Down
2 changes: 2 additions & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var defaultLocal = Local{
EnableMetricReporting: false,
EnableOutgoingNetworkMessageFiltering: true,
EnableP2P: false,
EnableP2PHybridMode: false,
EnablePingHandler: true,
EnableProcessBlockStats: false,
EnableProfiler: false,
Expand Down Expand Up @@ -116,6 +117,7 @@ var defaultLocal = Local{
OptimizeAccountsDatabaseOnStartup: false,
OutgoingMessageFilterBucketCount: 3,
OutgoingMessageFilterBucketSize: 128,
P2PListenAddress: "",
P2PPersistPeerID: false,
P2PPrivateKeyLocation: "",
ParticipationKeysRefreshInterval: 60000000000,
Expand Down
3 changes: 2 additions & 1 deletion data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ var txBacklogSize = config.GetDefaultLocal().TxBacklogSize
// mock sender is used to implement OnClose, since TXHandlers expect to use Senders and ERL Clients
type mockSender struct{}

func (m mockSender) OnClose(func()) {}
func (m mockSender) OnClose(func()) {}
func (m mockSender) GetNetwork() network.GossipNode { panic("not implemented") }

// txHandlerConfig is a subset of tx handler related options from config.Local
type txHandlerConfig struct {
Expand Down
2 changes: 2 additions & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"EnableMetricReporting": false,
"EnableOutgoingNetworkMessageFiltering": true,
"EnableP2P": false,
"EnableP2PHybridMode": false,
"EnablePingHandler": true,
"EnableProcessBlockStats": false,
"EnableProfiler": false,
Expand Down Expand Up @@ -95,6 +96,7 @@
"OptimizeAccountsDatabaseOnStartup": false,
"OutgoingMessageFilterBucketCount": 3,
"OutgoingMessageFilterBucketSize": 128,
"P2PListenAddress": "",
"P2PPersistPeerID": false,
"P2PPrivateKeyLocation": "",
"ParticipationKeysRefreshInterval": 60000000000,
Expand Down
2 changes: 1 addition & 1 deletion network/connPerfMon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) {

addMsg := func(msgCount int) {
for i := 0; i < msgCount; i++ {
msg.Sender = peers[(int(msgIndex)+i)%len(peers)]
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer)
timer += int64(7 * time.Nanosecond)
msg.Received = timer
out = append(out, msg)
Expand Down
21 changes: 16 additions & 5 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"context"
"net"
"net/http"
"strings"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/protocol"
Expand All @@ -28,6 +29,11 @@
// Peer opaque interface for referring to a neighbor in the network
type Peer interface{}

// DisconnectablePeer is a Peer with a long-living connection to a network that can be disconnected
type DisconnectablePeer interface {
GetNetwork() GossipNode
}

// PeerOption allows users to specify a subset of peers to query
//
//msgp:ignore PeerOption
Expand All @@ -51,7 +57,7 @@
Address() (string, bool)
Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error
Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error
Disconnect(badnode Peer)
Disconnect(badnode DisconnectablePeer)
DisconnectPeers() // only used by testing

// RegisterHTTPHandler path accepts gorilla/mux path annotations
Expand All @@ -78,7 +84,7 @@
ClearHandlers()

// GetRoundTripper returns a Transport that would limit the number of outgoing connections.
GetRoundTripper() http.RoundTripper
GetRoundTripper(peer Peer) http.RoundTripper

// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress.
// this is the only indication that we have that we haven't formed a clique, where all incoming messages
Expand All @@ -90,8 +96,8 @@
// request that was provided to the http handler ( or provide a fallback Context() to that )
GetHTTPRequestConnection(request *http.Request) (conn net.Conn)

// SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID.
SubstituteGenesisID(rawURL string) string
// GetGenesisID returns the network-specific genesisID.
GetGenesisID() string

// called from wsPeer to report that it has closed
peerRemoteClose(peer *wsPeer, reason disconnectReason)
Expand All @@ -109,7 +115,7 @@

// IncomingMessage represents a message arriving from some peer in our p2p network
type IncomingMessage struct {
Sender Peer
Sender DisconnectablePeer
Tag Tag
Data []byte
Err error
Expand Down Expand Up @@ -207,3 +213,8 @@
}
return
}

// SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID.
func SubstituteGenesisID(net GossipNode, rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", net.GetGenesisID(), -1)

Check warning on line 219 in network/gossipNode.go

View check run for this annotation

Codecov / codecov/patch

network/gossipNode.go#L218-L219

Added lines #L218 - L219 were not covered by tests
}
Loading