Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add protocol and address filtering to delegated routing api #678

Merged
merged 10 commits into from
Oct 1, 2024
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ The following emojis are used to highlight certain changes:

* `boxo/bitswap/server`:
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
- `routing/http`: added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671)
* `routing/http`:
* added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671) [#678](https://github.com/ipfs/boxo/pull/678)
* added support for address and protocol filtering to the delegated routing client ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#678](https://github.com/ipfs/boxo/pull/678). To add filtering to the client, use the [`WithFilterAddrs`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterAddrs) and [`WithFilterProtocols`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterProtocols) options when creating the client.Client-side filtering for servers that don't support filtering is enabled by default. To disable it, use the [`disableLocalFiltering`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#disableLocalFiltering) option when creating the client.

### Changed

Expand Down
54 changes: 52 additions & 2 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
"io"
"mime"
"net/http"
"sort"
"strings"
"time"

"github.com/benbjohnson/clock"
ipns "github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/boxo/routing/http/filters"
"github.com/ipfs/boxo/routing/http/internal/drjson"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
Expand Down Expand Up @@ -52,6 +54,11 @@
// for testing, e.g., testing the server with a mangled signature.
//lint:ignore SA1019 // ignore staticcheck
afterSignCallback func(req *types.WriteBitswapRecord)

// disableLocalFiltering is used to disable local filtering of the results
disableLocalFiltering bool
protocolFilter []string
addrFilter []string
}

// defaultUserAgent is used as a fallback to inform HTTP server which library
Expand Down Expand Up @@ -83,6 +90,37 @@
}
}

// WithDisabledLocalFiltering disables local filtering of the results.
// This should be used for delegated routing servers that already implement filtering
func WithDisabledLocalFiltering() Option {
2color marked this conversation as resolved.
Show resolved Hide resolved
return func(c *Client) error {
c.disableLocalFiltering = true
return nil
}

Check warning on line 99 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L95-L99

Added lines #L95 - L99 were not covered by tests
}

// WithProtocolFilter adds a protocol filter to the client.
// The protocol filter is added to the request URL.
// The protocols are ordered alphabetically for cache key (url) consistency
func WithProtocolFilter(protocolFilter []string) Option {
return func(c *Client) error {
sort.Strings(protocolFilter)
c.protocolFilter = protocolFilter
return nil
}
}

// WithAddrFilter adds an address filter to the client.
// The address filter is added to the request URL.
// The addresses are ordered alphabetically for cache key (url) consistency
func WithAddrFilter(addrFilter []string) Option {
return func(c *Client) error {
sort.Strings(addrFilter)
c.addrFilter = addrFilter
return nil
}

Check warning on line 121 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L116-L121

Added lines #L116 - L121 were not covered by tests
}

// WithHTTPClient sets a custom HTTP Client to be used with [Client].
func WithHTTPClient(h httpClient) Option {
return func(c *Client) error {
Expand Down Expand Up @@ -184,7 +222,9 @@
// TODO test measurements
m := newMeasurement("FindProviders")

url := c.baseURL + "/routing/v1/providers/" + key.String()
url := fmt.Sprintf("%s/routing/v1/providers/%s", c.baseURL, key.String())
2color marked this conversation as resolved.
Show resolved Hide resolved
url = filters.AddFiltersToURL(url, c.protocolFilter, c.addrFilter)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -251,6 +291,10 @@
return nil, errors.New("unknown content type")
}

if !c.disableLocalFiltering {
it = filters.ApplyFiltersToIter(it, c.addrFilter, c.protocolFilter)
}

return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
}

Expand Down Expand Up @@ -356,7 +400,9 @@
func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) {
m := newMeasurement("FindPeers")

url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String()
url := fmt.Sprintf("%s/routing/v1/peers/%s", c.baseURL, peer.ToCid(pid).String())
url = filters.AddFiltersToURL(url, c.protocolFilter, c.addrFilter)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -423,6 +469,10 @@
return nil, errors.New("unknown content type")
}

if !c.disableLocalFiltering {
it = filters.ApplyFiltersToPeerRecordIter(it, c.addrFilter, c.protocolFilter)
}

return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
}

Expand Down
56 changes: 41 additions & 15 deletions routing/http/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) {
return
}

func makePeerRecord() types.PeerRecord {
func makePeerRecord(protocols []string) types.PeerRecord {
peerID, addrs, _ := makeProviderAndIdentity()
return types.PeerRecord{
Schema: types.SchemaPeer,
ID: &peerID,
Protocols: []string{"transport-bitswap"},
Protocols: protocols,
Addrs: addrsToDRAddrs(addrs),
Extra: map[string]json.RawMessage{},
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func makeProviderAndIdentity() (peer.ID, []multiaddr.Multiaddr, crypto.PrivKey)
panic(err)
}

ma2, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4002")
ma2, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/udp/4002")
if err != nil {
panic(err)
}
Expand All @@ -222,9 +222,11 @@ func (e *osErrContains) errContains(t *testing.T, err error) {
}

func TestClient_FindProviders(t *testing.T) {
peerRecord := makePeerRecord()
bitswapPeerRecord := makePeerRecord([]string{"transport-bitswap"})
httpPeerRecord := makePeerRecord([]string{"transport-ipfs-gateway-http"})
peerProviders := []iter.Result[types.Record]{
{Val: &peerRecord},
{Val: &bitswapPeerRecord},
{Val: &httpPeerRecord},
}

bitswapRecord := makeBitswapRecord()
Expand All @@ -238,6 +240,7 @@ func TestClient_FindProviders(t *testing.T) {
routerErr error
clientRequiresStreaming bool
serverStreamingDisabled bool
filterProtocols []string

expErrContains osErrContains
expResult []iter.Result[types.Record]
Expand All @@ -250,6 +253,13 @@ func TestClient_FindProviders(t *testing.T) {
expResult: peerProviders,
expStreamingResponse: true,
},
{
name: "happy case with protocol filter",
filterProtocols: []string{"transport-bitswap"},
routerResult: peerProviders,
expResult: []iter.Result[types.Record]{{Val: &bitswapPeerRecord}},
expStreamingResponse: true,
},
{
name: "happy case (with deprecated bitswap schema)",
routerResult: []iter.Result[types.Record]{{Val: &bitswapRecord}},
Expand Down Expand Up @@ -305,6 +315,10 @@ func TestClient_FindProviders(t *testing.T) {
})
}

if c.filterProtocols != nil {
clientOpts = append(clientOpts, WithProtocolFilter(c.filterProtocols))
}

if c.expStreamingResponse {
onRespReceived = append(onRespReceived, func(r *http.Response) {
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
Expand Down Expand Up @@ -482,11 +496,13 @@ func TestClient_Provide(t *testing.T) {
}

func TestClient_FindPeers(t *testing.T) {
peerRecord := makePeerRecord()
peerRecord1 := makePeerRecord([]string{"transport-bitswap"})
peerRecord2 := makePeerRecord([]string{"transport-ipfs-gateway-http"})
peerRecords := []iter.Result[*types.PeerRecord]{
{Val: &peerRecord},
{Val: &peerRecord1},
{Val: &peerRecord2},
}
pid := *peerRecord.ID
pid := *peerRecord1.ID

cases := []struct {
name string
Expand All @@ -496,6 +512,7 @@ func TestClient_FindPeers(t *testing.T) {
routerErr error
clientRequiresStreaming bool
serverStreamingDisabled bool
filterProtocols []string

expErrContains osErrContains
expResult []iter.Result[*types.PeerRecord]
Expand All @@ -508,6 +525,13 @@ func TestClient_FindPeers(t *testing.T) {
expResult: peerRecords,
expStreamingResponse: true,
},
{
name: "happy case with protocol filter",
filterProtocols: []string{"transport-bitswap"},
routerResult: peerRecords,
expResult: []iter.Result[*types.PeerRecord]{{Val: &peerRecord1}},
expStreamingResponse: true,
},
{
name: "server doesn't support streaming",
routerResult: peerRecords,
Expand Down Expand Up @@ -542,12 +566,10 @@ func TestClient_FindPeers(t *testing.T) {
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var (
clientOpts []Option
serverOpts []server.Option
onRespReceived []func(*http.Response)
onReqReceived []func(*http.Request)
)
var clientOpts []Option
var serverOpts []server.Option
var onRespReceived []func(*http.Response)
var onReqReceived []func(*http.Request)

if c.serverStreamingDisabled {
serverOpts = append(serverOpts, server.WithStreamingResultsDisabled())
Expand All @@ -560,6 +582,10 @@ func TestClient_FindPeers(t *testing.T) {
})
}

if c.filterProtocols != nil {
clientOpts = append(clientOpts, WithProtocolFilter(c.filterProtocols))
}

if c.expStreamingResponse {
onRespReceived = append(onRespReceived, func(r *http.Response) {
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
Expand Down Expand Up @@ -603,7 +629,7 @@ func TestClient_FindPeers(t *testing.T) {
resultIter, err := client.FindPeers(ctx, pid)
c.expErrContains.errContains(t, err)

results := iter.ReadAll[iter.Result[*types.PeerRecord]](resultIter)
results := iter.ReadAll(resultIter)
assert.Equal(t, c.expResult, results)
})
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,48 @@
package server
package filters

import (
"net/url"
"reflect"
"slices"
"strings"

"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr"
)

// filters implements IPIP-0484
var logger = logging.Logger("routing/http/filters")

func parseFilter(param string) []string {
// Package filters implements IPIP-0484

func ParseFilter(param string) []string {
if param == "" {
return nil
}
return strings.Split(strings.ToLower(param), ",")
}

func AddFiltersToURL(baseURL string, protocolFilter, addrFilter []string) string {
parsedURL, err := url.Parse(baseURL)
if err != nil {
return baseURL
}

Check warning on line 30 in routing/http/filters/filters.go

View check run for this annotation

Codecov / codecov/patch

routing/http/filters/filters.go#L29-L30

Added lines #L29 - L30 were not covered by tests

query := parsedURL.Query()

if len(protocolFilter) > 0 {
query.Set("filter-protocols", strings.Join(protocolFilter, ","))
}

if len(addrFilter) > 0 {
query.Set("filter-addrs", strings.Join(addrFilter, ","))
}

parsedURL.RawQuery = query.Encode()
return parsedURL.String()
}

// applyFiltersToIter applies the filters to the given iterator and returns a new iterator.
//
// The function iterates over the input iterator, applying the specified filters to each record.
Expand All @@ -28,7 +52,7 @@
// - recordsIter: An iterator of types.Record to be filtered.
// - filterAddrs: A slice of strings representing the address filter criteria.
// - filterProtocols: A slice of strings representing the protocol filter criteria.
func applyFiltersToIter(recordsIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) iter.ResultIter[types.Record] {
func ApplyFiltersToIter(recordsIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) iter.ResultIter[types.Record] {
mappedIter := iter.Map(recordsIter, func(v iter.Result[types.Record]) iter.Result[types.Record] {
if v.Err != nil || v.Val == nil {
return v
Expand Down Expand Up @@ -76,6 +100,30 @@
return filteredIter
}

func ApplyFiltersToPeerRecordIter(peerRecordIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string) iter.ResultIter[*types.PeerRecord] {
// Convert PeerRecord to Record so that we can reuse the filtering logic from findProviders
mappedIter := iter.Map(peerRecordIter, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] {
if v.Err != nil || v.Val == nil {
return iter.Result[types.Record]{Err: v.Err}
}

Check warning on line 108 in routing/http/filters/filters.go

View check run for this annotation

Codecov / codecov/patch

routing/http/filters/filters.go#L107-L108

Added lines #L107 - L108 were not covered by tests

var record types.Record = v.Val
return iter.Result[types.Record]{Val: record}
})

filteredIter := ApplyFiltersToIter(mappedIter, filterAddrs, filterProtocols)

// Convert Record back to PeerRecord 🙃
return iter.Map(filteredIter, func(v iter.Result[types.Record]) iter.Result[*types.PeerRecord] {
if v.Err != nil || v.Val == nil {
return iter.Result[*types.PeerRecord]{Err: v.Err}
}

Check warning on line 120 in routing/http/filters/filters.go

View check run for this annotation

Codecov / codecov/patch

routing/http/filters/filters.go#L119-L120

Added lines #L119 - L120 were not covered by tests

var record *types.PeerRecord = v.Val.(*types.PeerRecord)
return iter.Result[*types.PeerRecord]{Val: record}
})
}

// Applies the filters. Returns nil if the provider does not pass the protocols filter
// The address filter is more complicated because it potentially modifies the Addrs slice.
func applyFilters(provider *types.PeerRecord, filterAddrs, filterProtocols []string) *types.PeerRecord {
Expand Down
Loading