Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: p2p traffic exchange for algorand node #5939

Merged
merged 53 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
175a7e1
p2p: Capabilities Advertisement for Archival/Catchpoints (#5702)
Eric-Warehime Nov 2, 2023
a0d2b3b
Merge remote-tracking branch 'upstream/master' into feature/p2p
cce Nov 2, 2023
4349e79
add back pregen short cmd description from bad merge
cce Nov 2, 2023
bcf71c1
p2p dht: more tests and minor fixes (#5827)
algorandskiy Nov 14, 2023
b6f8a69
network: hybrid networking (#5800)
cce Nov 14, 2023
51a1b9d
p2p: fix dht-hybrid merge (#5833)
algorandskiy Nov 15, 2023
2c7c70d
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Nov 17, 2023
5fef1a9
move new config opts to v33
algorandskiy Nov 17, 2023
c37c155
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Dec 12, 2023
1de1595
p2p: start p2p networking and DHT ops when starting services, not whe…
algorandskiy Dec 21, 2023
9a81cdd
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Jan 9, 2024
30eeafa
p2p: merge master, update license year
algorandskiy Jan 9, 2024
63d4995
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Jan 23, 2024
d69938a
p2p: HTTP catchup over p2p network (#5898)
algorandskiy Jan 29, 2024
153f10d
p2p: enable p2p http txsync (#5922)
algorandskiy Jan 30, 2024
a8e2254
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Jan 30, 2024
ba8b2cd
p2p: fix infinite loop in dnsaddr resolution (#5926)
algorandskiy Feb 2, 2024
e8c4ae6
p2p: http catchpoints support (#5924)
algorandskiy Feb 5, 2024
200e0bb
p2p: support block request redirects for p2p addresses (#5929)
algorandskiy Feb 5, 2024
472f01b
p2p: rate limit outgoing p2p http connections (#5931)
algorandskiy Feb 7, 2024
e506ffd
network: unify wsPeerCore use for HTTP and p2p HTTP transport (#5933)
algorandskiy Feb 8, 2024
2df76fd
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Feb 13, 2024
a0a1605
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Feb 21, 2024
504c6ba
bump config version to v34
algorandskiy Feb 21, 2024
ef82b2b
p2p: introduce Gossip peer capability (#5935)
algorandskiy Feb 23, 2024
e3a378e
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Mar 22, 2024
ac58d8e
post merge fixes
algorandskiy Mar 22, 2024
b165abf
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Apr 1, 2024
46b5e99
p2p: add telemetry and DHT/libp2p metrics (#5941)
algorandskiy Apr 2, 2024
d38f35c
p2p: fix wantTXGossip modifications (#5982)
algorandskiy Apr 19, 2024
411837f
p2p: refactor gossipsub to validation and handling (#5976)
algorandskiy May 10, 2024
363982b
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Jun 7, 2024
8f9ad90
post-merge fixes
algorandskiy Jun 7, 2024
052792d
p2p: test scenarios support (#5962)
algorandskiy Jun 12, 2024
820d75a
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Jun 12, 2024
8e6c1af
post merge: regen config-v34
algorandskiy Jun 12, 2024
a513cbf
remove mis-merged v34 fields from config-v33
algorandskiy Jun 13, 2024
0dd0cb7
p2p: convert additional Multiplexer method to use generic implementat…
cce Jun 20, 2024
8a584dd
p2p feature PR: CR fixes (#6038)
algorandskiy Jun 21, 2024
fcfab80
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Jun 21, 2024
63bf1a9
config: document EnableP2PHybridMode takes precedence over EnableP2P
algorandskiy Jun 21, 2024
005e225
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Jun 25, 2024
66a1d35
Update node/node_test.go
algorandskiy Jun 25, 2024
a6248b2
p2p: more CR fixes: ERL and DHT err logging (#6040)
algorandskiy Jun 26, 2024
a807471
remove unnecessary identityChallengeSigner Verify method
cce Jun 27, 2024
c9ceb49
Update network/p2p/p2p.go
cce Jun 27, 2024
d2090c6
rename snapshoter => snapshotter
cce Jun 27, 2024
8b291b0
Merge remote-tracking branch 'upstream/master' into feature/p2p
algorandskiy Jun 28, 2024
4152b88
trim file path in p2p logger
cce Jun 28, 2024
9fe41ad
Update network/p2pNetwork.go
cce Jun 28, 2024
65c0181
rename DeadlineSettable => DeadlineSettableConn as intended in f6dd7a…
cce Jun 28, 2024
c87ceeb
remove EnableMetricReporting from p2p net
algorandskiy Jun 28, 2024
4564473
remove algorand-ws streams limiting
algorandskiy Jun 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ type NetworkFacade struct {
rand *rand.Rand
timeoutAtInitOnce sync.Once
timeoutAtInitWait sync.WaitGroup
peerToNode map[network.Peer]int
peerToNode map[*facadePeer]int
}

type facadePeer struct {
id int
net network.GossipNode
}

func (p *facadePeer) GetNetwork() network.GossipNode { return p.net }

// MakeNetworkFacade creates a facade with a given nodeID.
func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
n := &NetworkFacade{
Expand All @@ -83,12 +90,12 @@ func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
eventsQueues: make(map[string]int),
eventsQueuesCh: make(chan int, 1000),
rand: rand.New(rand.NewSource(int64(nodeID))),
peerToNode: make(map[network.Peer]int, fuzzer.nodesCount),
peerToNode: make(map[*facadePeer]int, fuzzer.nodesCount),
debugMessages: false,
}
n.timeoutAtInitWait.Add(1)
for i := 0; i < fuzzer.nodesCount; i++ {
n.peerToNode[network.Peer(new(int))] = i
n.peerToNode[&facadePeer{id: i, net: n}] = i
}
return n
}
Expand Down Expand Up @@ -179,7 +186,7 @@ func (n *NetworkFacade) WaitForEventsQueue(cleared bool) {
func (n *NetworkFacade) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, exclude network.Peer) error {
excludeNode := -1
if exclude != nil {
excludeNode = n.peerToNode[exclude]
excludeNode = n.peerToNode[exclude.(*facadePeer)]
}
return n.broadcast(tag, data, excludeNode, "NetworkFacade service-%v Broadcast %v %v\n")
}
Expand Down Expand Up @@ -240,7 +247,7 @@ func (n *NetworkFacade) PushDownstreamMessage(newMsg context.CancelFunc) bool {
func (n *NetworkFacade) Address() (string, bool) { return "mock network", true }

// Start - unused function
func (n *NetworkFacade) Start() {}
func (n *NetworkFacade) Start() error { return nil }

// Stop - unused function
func (n *NetworkFacade) Stop() {}
Expand Down Expand Up @@ -341,8 +348,8 @@ func (n *NetworkFacade) ReceiveMessage(sourceNode int, tag protocol.Tag, data []
n.pushPendingReceivedMessage()
}

func (n *NetworkFacade) Disconnect(sender network.Peer) {
sourceNode := n.peerToNode[sender]
func (n *NetworkFacade) Disconnect(sender network.DeadlineSettableConn) {
sourceNode := n.peerToNode[sender.(*facadePeer)]
n.fuzzer.Disconnect(n.nodeID, sourceNode)
}

Expand Down
9 changes: 4 additions & 5 deletions agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package gossip

import (
"context"
"net"
"net/http"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -136,7 +135,7 @@ func (w *whiteholeNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b
func (w *whiteholeNetwork) BroadcastSimple(tag protocol.Tag, data []byte) error {
return w.Broadcast(context.Background(), tag, data, true, nil)
}
func (w *whiteholeNetwork) Disconnect(badnode network.Peer) {
func (w *whiteholeNetwork) Disconnect(badnode network.DeadlineSettableConn) {
return
}
func (w *whiteholeNetwork) DisconnectPeers() {
Expand All @@ -156,11 +155,11 @@ func (w *whiteholeNetwork) GetPeers(options ...network.PeerOption) []network.Pee
}
func (w *whiteholeNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}
func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) {
func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettable) {
return nil
}

func (w *whiteholeNetwork) Start() {
func (w *whiteholeNetwork) Start() error {
w.quit = make(chan struct{})
go func(w *whiteholeNetwork) {
w.domain.messagesMu.Lock()
Expand Down Expand Up @@ -216,7 +215,7 @@ func (w *whiteholeNetwork) Start() {
atomic.AddUint32(&w.lastMsgRead, 1)
}
}(w)
return
return nil
}
func (w *whiteholeNetwork) getMux() *network.Multiplexer {
return w.mux
Expand Down
18 changes: 12 additions & 6 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net"
"net/http"
"net/url"
"strings"
"testing"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -174,8 +173,8 @@ func (b *basicRPCNode) GetPeers(options ...network.PeerOption) []network.Peer {
return b.peers
}

func (b *basicRPCNode) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1)
func (b *basicRPCNode) GetGenesisID() string {
return "test genesisID"
}

type httpTestPeerSource struct {
Expand All @@ -192,8 +191,8 @@ func (s *httpTestPeerSource) RegisterHandlers(dispatch []network.TaggedMessageHa
s.dispatchHandlers = append(s.dispatchHandlers, dispatch...)
}

func (s *httpTestPeerSource) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1)
func (s *httpTestPeerSource) GetGenesisID() string {
return "test genesisID"
}

// implement network.HTTPPeer
Expand All @@ -202,8 +201,13 @@ type testHTTPPeer string
func (p *testHTTPPeer) GetAddress() string {
return string(*p)
}

func (p *testHTTPPeer) GetHTTPClient() *http.Client {
return &http.Client{}
return &http.Client{
Transport: &network.HTTPPAddressBoundTransport{
Addr: p.GetAddress(),
InnerTransport: http.DefaultTransport},
}
}
func (p *testHTTPPeer) GetHTTPPeer() network.HTTPPeer {
return p
Expand Down Expand Up @@ -239,6 +243,8 @@ func (p *testUnicastPeer) GetAddress() string {
return "test"
}

func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn }

func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) {

responseChannel := make(chan *network.Response, 1)
Expand Down
9 changes: 1 addition & 8 deletions catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"io"
"net/http"
"path"
"strconv"
"time"

Expand Down Expand Up @@ -74,13 +73,7 @@ func makeLedgerFetcher(net network.GossipNode, accessor ledger.CatchpointCatchup
}

func (lf *ledgerFetcher) requestLedger(ctx context.Context, peer network.HTTPPeer, round basics.Round, method string) (*http.Response, error) {
parsedURL, err := network.ParseHostOrURL(peer.GetAddress())
if err != nil {
return nil, err
}

parsedURL.Path = lf.net.SubstituteGenesisID(path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36)))
ledgerURL := parsedURL.String()
ledgerURL := network.SubstituteGenesisID(lf.net, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36))
lf.log.Debugf("ledger %s %#v peer %#v %T", method, ledgerURL, peer, peer)
request, err := http.NewRequestWithContext(ctx, method, ledgerURL, nil)
if err != nil {
Expand Down
56 changes: 50 additions & 6 deletions catchup/ledgerFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package catchup

import (
"archive/tar"
"context"
"fmt"
"net"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/logging"
p2ptesting "github.com/algorand/go-algorand/network/p2p/testing"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/test/partitiontest"
)

Expand Down Expand Up @@ -125,7 +128,7 @@ func TestLedgerFetcherErrorResponseHandling(t *testing.T) {
}
}

func TestLedgerFetcherHeadLedger(t *testing.T) {
func TestLedgerFetcher(t *testing.T) {
partitiontest.PartitionTest(t)

// create a dummy server.
Expand All @@ -136,16 +139,19 @@ func TestLedgerFetcherHeadLedger(t *testing.T) {
listener, err := net.Listen("tcp", "localhost:")

var httpServerResponse = 0
var contentTypes = make([]string, 0)
require.NoError(t, err)
go s.Serve(listener)
defer s.Close()
defer listener.Close()
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
for _, contentType := range contentTypes {
w.Header().Add("Content-Type", contentType)
if req.Method == http.MethodHead {
w.WriteHeader(httpServerResponse)
} else {
w.Header().Add("Content-Type", rpcs.LedgerResponseContentType)
w.WriteHeader(httpServerResponse)
wtar := tar.NewWriter(w)
wtar.Close()
}
w.WriteHeader(httpServerResponse)
})
successPeer := testHTTPPeer(listener.Addr().String())
lf := makeLedgerFetcher(&mocks.MockNetwork{}, &mocks.MockCatchpointCatchupAccessor{}, logging.TestingLog(t), &dummyLedgerFetcherReporter{}, config.GetDefaultLocal())
Expand All @@ -157,7 +163,7 @@ func TestLedgerFetcherHeadLedger(t *testing.T) {
// headLedger parseURL failure
parseFailurePeer := testHTTPPeer("foobar")
err = lf.headLedger(context.Background(), &parseFailurePeer, basics.Round(0))
require.Equal(t, fmt.Errorf("could not parse a host from url"), err)
require.ErrorContains(t, err, "could not parse a host from url")

// headLedger 404 response
httpServerResponse = http.StatusNotFound
Expand All @@ -169,8 +175,46 @@ func TestLedgerFetcherHeadLedger(t *testing.T) {
err = lf.headLedger(context.Background(), &successPeer, basics.Round(0))
require.NoError(t, err)

httpServerResponse = http.StatusOK
err = lf.downloadLedger(context.Background(), &successPeer, basics.Round(0))
require.NoError(t, err)

// headLedger 500 response
httpServerResponse = http.StatusInternalServerError
err = lf.headLedger(context.Background(), &successPeer, basics.Round(0))
require.Equal(t, fmt.Errorf("headLedger error response status code %d", http.StatusInternalServerError), err)
}

func TestLedgerFetcherP2P(t *testing.T) {
partitiontest.PartitionTest(t)

mux := http.NewServeMux()
nodeA := p2ptesting.MakeHTTPNode(t)
nodeA.RegisterHTTPHandler("/v1/ledger/0", mux)
var httpServerResponse = 0
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodHead {
w.WriteHeader(httpServerResponse)
} else {
w.Header().Add("Content-Type", rpcs.LedgerResponseContentType)
w.WriteHeader(httpServerResponse)
wtar := tar.NewWriter(w)
wtar.Close()
}
})

nodeA.Start()
defer nodeA.Stop()

successPeer := nodeA.GetHTTPPeer()
lf := makeLedgerFetcher(nodeA, &mocks.MockCatchpointCatchupAccessor{}, logging.TestingLog(t), &dummyLedgerFetcherReporter{}, config.GetDefaultLocal())

// headLedger 200 response
httpServerResponse = http.StatusOK
err := lf.headLedger(context.Background(), successPeer, basics.Round(0))
require.NoError(t, err)

httpServerResponse = http.StatusOK
err = lf.downloadLedger(context.Background(), successPeer, basics.Round(0))
require.NoError(t, err)
}
7 changes: 1 addition & 6 deletions catchup/universalFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,8 @@ type HTTPFetcher struct {
// getBlockBytes gets a block.
// Core piece of FetcherClient interface
func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data []byte, err error) {
parsedURL, err := network.ParseHostOrURL(hf.rootURL)
if err != nil {
return nil, err
}
blockURL := rpcs.FormatBlockQuery(uint64(r), "", hf.net)

parsedURL.Path = rpcs.FormatBlockQuery(uint64(r), parsedURL.Path, hf.net)
blockURL := parsedURL.String()
hf.log.Debugf("block GET %#v peer %#v %T", blockURL, hf.peer, hf.peer)
request, err := http.NewRequest("GET", blockURL, nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/algod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/network/addr"
"github.com/algorand/go-algorand/protocol"
toolsnet "github.com/algorand/go-algorand/tools/network"
"github.com/algorand/go-algorand/util"
Expand Down Expand Up @@ -282,7 +282,7 @@

// make sure that the format of each entry is valid:
for idx, peer := range peerOverrideArray {
addr, addrErr := network.ParseHostOrURLOrMultiaddr(peer)
addr, addrErr := addr.ParseHostOrURLOrMultiaddr(peer)

Check warning on line 285 in cmd/algod/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/algod/main.go#L285

Added line #L285 was not covered by tests
if addrErr != nil {
fmt.Fprintf(os.Stderr, "Provided command line parameter '%s' is not a valid host:port pair\n", peer)
return 1
Expand Down
4 changes: 2 additions & 2 deletions cmd/goal/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated/model"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/libgoal"
"github.com/algorand/go-algorand/network"
naddr "github.com/algorand/go-algorand/network/addr"
"github.com/algorand/go-algorand/nodecontrol"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/tokens"
Expand Down Expand Up @@ -751,7 +751,7 @@

// make sure that the format of each entry is valid:
for _, peer := range strings.Split(peerDial, ";") {
_, err := network.ParseHostOrURLOrMultiaddr(peer)
_, err := naddr.ParseHostOrURLOrMultiaddr(peer)

Check warning on line 754 in cmd/goal/node.go

View check run for this annotation

Codecov / codecov/patch

cmd/goal/node.go#L754

Added line #L754 was not covered by tests
if err != nil {
reportErrorf("Provided peer '%s' is not a valid peer address : %v", peer, err)
return false
Expand Down
Loading
Loading