Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: reuse existing libp2p.Host for http clients #6129

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,44 @@
})
}

// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
if err != nil {
return nil, err
type httpClientConfig struct {
host host.Host
}

type httpClientOption func(*httpClientConfig)

// WithHost sets the libp2p host for the http client.
func WithHost(h host.Host) httpClientOption {
return func(o *httpClientConfig) {
o.host = h

Check warning on line 87 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L85-L87

Added lines #L85 - L87 were not covered by tests
}
}

// MakeTestHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// This exported method is only used in tests.
func MakeTestHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
return makeHTTPClient(addrInfo, opts...)

Check warning on line 94 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L93-L94

Added lines #L93 - L94 were not covered by tests
}

// makeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
// If service is nil, a new libp2p host is created.
func makeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Client, error) {
var config httpClientConfig
for _, opt := range opts {
opt(&config)

Check warning on line 102 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L99-L102

Added lines #L99 - L102 were not covered by tests
}

var clientStreamHost host.Host
if config.host != nil {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
clientStreamHost = config.host
} else {
var err error
clientStreamHost, err = libp2p.New(libp2p.NoListenAddrs)
if err != nil {
return nil, err

Check warning on line 112 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L105-L112

Added lines #L105 - L112 were not covered by tests
}
logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String())

Check warning on line 114 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L114

Added line #L114 was not covered by tests
}
logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String())

client := libp2phttp.Host{StreamHost: clientStreamHost}

Expand All @@ -97,13 +128,13 @@
return &http.Client{Transport: rt}, nil
}

// MakeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, pstore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := MakeHTTPClient(addrInfo)
// makeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func makeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, p2pService *serviceImpl, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := makeHTTPClient(addrInfo, WithHost(p2pService.host))

Check warning on line 133 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L132-L133

Added lines #L132 - L133 were not covered by tests
if err != nil {
return nil, err
}
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, string(addrInfo.ID))
rltr := limitcaller.MakeRateLimitingBoundTransportWithRoundTripper(connTimeStore, queueingTimeout, cl.Transport, string(addrInfo.ID))

Check warning on line 137 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L137

Added line #L137 was not covered by tests
cl.Transport = &rltr
return cl, nil

Expand Down
10 changes: 10 additions & 0 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
"encoding/base32"
"fmt"
"net"
"net/http"
"runtime"
"strings"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/limitcaller"
pstore "github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-algorand/util/metrics"
Expand Down Expand Up @@ -69,6 +71,9 @@
ListPeersForTopic(topic string) []peer.ID
Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error)
Publish(ctx context.Context, topic string, data []byte) error

// GetHTTPClient returns a rate-limiting libp2p-streaming http client that can be used to make requests to the given peer
GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error)
}

// serviceImpl manages integration with libp2p and implements the Service interface
Expand Down Expand Up @@ -412,3 +417,8 @@
}
return res
}

// GetHTTPClient returns a libp2p-streaming http client that can be used to make requests to the given peer
func (s *serviceImpl) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
return makeHTTPClientWithRateLimit(addrInfo, s, connTimeStore, queueingTimeout)

Check warning on line 423 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L422-L423

Added lines #L422 - L423 were not covered by tests
}
2 changes: 1 addition & 1 deletion network/p2p/testing/httpNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (p httpPeer) GetAddress() string {

// GetAddress implements HTTPPeer interface and returns the http client for a peer
func (p httpPeer) GetHTTPClient() *http.Client {
c, err := p2p.MakeHTTPClient(&p.addrInfo)
c, err := p2p.MakeTestHTTPClient(&p.addrInfo)
require.NoError(p.tb, err)
return c
}
Expand Down
6 changes: 3 additions & 3 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@
}
addr := mas[0].String()

client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("MakeHTTPClient failed: %v", err)
return wsPeerCore{}, false
Expand Down Expand Up @@ -718,7 +718,7 @@
if err != nil {
return nil, err
}
return p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
return n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)

Check warning on line 721 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L721

Added line #L721 was not covered by tests
}

// OnNetworkAdvance notifies the network library that the agreement protocol was able to make a notable progress.
Expand Down Expand Up @@ -771,7 +771,7 @@

// create a wsPeer for this stream and added it to the peers map.
addrInfo := &peer.AddrInfo{ID: p2pPeer, Addrs: []multiaddr.Multiaddr{ma}}
client, err := p2p.MakeHTTPClientWithRateLimit(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
client, err := n.service.GetHTTPClient(addrInfo, n.pstore, limitcaller.DefaultQueueingTimeout)
if err != nil {
n.log.Warnf("Cannot construct HTTP Client for %s: %v", p2pPeer, err)
client = nil
Expand Down
14 changes: 10 additions & 4 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ func (s *mockService) Publish(ctx context.Context, topic string, data []byte) er
return nil
}

func (s *mockService) GetHTTPClient(addrInfo *peer.AddrInfo, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
return nil, nil
}

func makeMockService(id peer.ID, addrs []ma.Multiaddr) *mockService {
return &mockService{
id: id,
Expand Down Expand Up @@ -757,7 +761,7 @@ func TestP2PHTTPHandler(t *testing.T) {
require.NoError(t, err)
require.NotZero(t, addrsA[0])

httpClient, err := p2p.MakeHTTPClient(&peerInfoA)
httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err := httpClient.Get("/test")
require.NoError(t, err)
Expand All @@ -768,7 +772,7 @@ func TestP2PHTTPHandler(t *testing.T) {
require.Equal(t, "hello", string(body))

// check another endpoint that also access the underlying connection/stream
httpClient, err = p2p.MakeHTTPClient(&peerInfoA)
httpClient, err = p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err = httpClient.Get("/check-conn")
require.NoError(t, err)
Expand All @@ -780,10 +784,12 @@ func TestP2PHTTPHandler(t *testing.T) {

// check rate limiting client:
// zero clients allowed, rate limiting window (10s) is greater than queue deadline (1s)
netB, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
pstore, err := peerstore.MakePhonebook(0, 10*time.Second)
require.NoError(t, err)
pstore.AddPersistentPeers([]*peer.AddrInfo{&peerInfoA}, "net", phonebook.PhoneBookEntryRelayRole)
httpClient, err = p2p.MakeHTTPClientWithRateLimit(&peerInfoA, pstore, 1*time.Second)
httpClient, err = netB.service.GetHTTPClient(&peerInfoA, pstore, 1*time.Second)
require.NoError(t, err)
_, err = httpClient.Get("/test")
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
Expand Down Expand Up @@ -815,7 +821,7 @@ func TestP2PHTTPHandlerAllInterfaces(t *testing.T) {
require.NotZero(t, addrsB[0])

t.Logf("peerInfoB: %s", peerInfoA)
httpClient, err := p2p.MakeHTTPClient(&peerInfoA)
httpClient, err := p2p.MakeTestHTTPClient(&peerInfoA)
require.NoError(t, err)
resp, err := httpClient.Get("/test")
require.NoError(t, err)
Expand Down
Loading