Skip to content

Commit

Permalink
feat: add protocol and address filtering to delegated routing api (#678)
Browse files Browse the repository at this point in the history
* feat: add filtering on client
* refactor: abstract add filters to url function
* feat: add client filtering to FindPeers
* test: test filtering in findPeers

---------

Co-authored-by: Daniel N <[email protected]>
Co-authored-by: gammazero <[email protected]>
  • Loading branch information
3 people authored Oct 1, 2024
1 parent 19a402b commit 4d0ae45
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 71 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ The following emojis are used to highlight certain changes:

* `boxo/bitswap/server`:
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
- `routing/http`: added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671)
* `routing/http`:
* added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671) [#678](https://github.com/ipfs/boxo/pull/678)
* added support for address and protocol filtering to the delegated routing client ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#678](https://github.com/ipfs/boxo/pull/678). To add filtering to the client, use the [`WithFilterAddrs`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterAddrs) and [`WithFilterProtocols`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterProtocols) options when creating the client.Client-side filtering for servers that don't support filtering is enabled by default. To disable it, use the [`disableLocalFiltering`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#disableLocalFiltering) option when creating the client.

### Changed

Expand Down
61 changes: 59 additions & 2 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"io"
"mime"
"net/http"
gourl "net/url"
"sort"
"strings"
"time"

"github.com/benbjohnson/clock"
ipns "github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/boxo/routing/http/filters"
"github.com/ipfs/boxo/routing/http/internal/drjson"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
Expand Down Expand Up @@ -52,6 +55,11 @@ type Client struct {
// for testing, e.g., testing the server with a mangled signature.
//lint:ignore SA1019 // ignore staticcheck
afterSignCallback func(req *types.WriteBitswapRecord)

// disableLocalFiltering is used to disable local filtering of the results
disableLocalFiltering bool
protocolFilter []string
addrFilter []string
}

// defaultUserAgent is used as a fallback to inform HTTP server which library
Expand Down Expand Up @@ -83,6 +91,37 @@ func WithIdentity(identity crypto.PrivKey) Option {
}
}

// WithDisabledLocalFiltering disables local filtering of the results.
// This should be used for delegated routing servers that already implement filtering
func WithDisabledLocalFiltering(val bool) Option {
return func(c *Client) error {
c.disableLocalFiltering = val
return nil
}
}

// WithProtocolFilter adds a protocol filter to the client.
// The protocol filter is added to the request URL.
// The protocols are ordered alphabetically for cache key (url) consistency
func WithProtocolFilter(protocolFilter []string) Option {
return func(c *Client) error {
sort.Strings(protocolFilter)
c.protocolFilter = protocolFilter
return nil
}
}

// WithAddrFilter adds an address filter to the client.
// The address filter is added to the request URL.
// The addresses are ordered alphabetically for cache key (url) consistency
func WithAddrFilter(addrFilter []string) Option {
return func(c *Client) error {
sort.Strings(addrFilter)
c.addrFilter = addrFilter
return nil
}
}

// WithHTTPClient sets a custom HTTP Client to be used with [Client].
func WithHTTPClient(h httpClient) Option {
return func(c *Client) error {
Expand Down Expand Up @@ -184,7 +223,12 @@ func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter
// TODO test measurements
m := newMeasurement("FindProviders")

url := c.baseURL + "/routing/v1/providers/" + key.String()
url, err := gourl.JoinPath(c.baseURL, "routing/v1/providers", key.String())
if err != nil {
return nil, err
}
url = filters.AddFiltersToURL(url, c.protocolFilter, c.addrFilter)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -251,6 +295,10 @@ func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter
return nil, errors.New("unknown content type")
}

if !c.disableLocalFiltering {
it = filters.ApplyFiltersToIter(it, c.addrFilter, c.protocolFilter)
}

return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
}

Expand Down Expand Up @@ -356,7 +404,12 @@ func (c *Client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri
func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) {
m := newMeasurement("FindPeers")

url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String()
url, err := gourl.JoinPath(c.baseURL, "routing/v1/peers", peer.ToCid(pid).String())
if err != nil {
return nil, err
}
url = filters.AddFiltersToURL(url, c.protocolFilter, c.addrFilter)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -423,6 +476,10 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI
return nil, errors.New("unknown content type")
}

if !c.disableLocalFiltering {
it = filters.ApplyFiltersToPeerRecordIter(it, c.addrFilter, c.protocolFilter)
}

return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
}

Expand Down
59 changes: 43 additions & 16 deletions routing/http/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit in

func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
args := m.Called(ctx, name)
return args.Get(0).(*ipns.Record), args.Error(1)
rec, _ := args.Get(0).(*ipns.Record)
return rec, args.Error(1)
}

func (m *mockContentRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
Expand Down Expand Up @@ -158,12 +159,12 @@ func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) {
return
}

func makePeerRecord() types.PeerRecord {
func makePeerRecord(protocols []string) types.PeerRecord {
peerID, addrs, _ := makeProviderAndIdentity()
return types.PeerRecord{
Schema: types.SchemaPeer,
ID: &peerID,
Protocols: []string{"transport-bitswap"},
Protocols: protocols,
Addrs: addrsToDRAddrs(addrs),
Extra: map[string]json.RawMessage{},
}
Expand Down Expand Up @@ -196,7 +197,7 @@ func makeProviderAndIdentity() (peer.ID, []multiaddr.Multiaddr, crypto.PrivKey)
panic(err)
}

ma2, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/4002")
ma2, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/udp/4002")
if err != nil {
panic(err)
}
Expand All @@ -222,9 +223,11 @@ func (e *osErrContains) errContains(t *testing.T, err error) {
}

func TestClient_FindProviders(t *testing.T) {
peerRecord := makePeerRecord()
bitswapPeerRecord := makePeerRecord([]string{"transport-bitswap"})
httpPeerRecord := makePeerRecord([]string{"transport-ipfs-gateway-http"})
peerProviders := []iter.Result[types.Record]{
{Val: &peerRecord},
{Val: &bitswapPeerRecord},
{Val: &httpPeerRecord},
}

bitswapRecord := makeBitswapRecord()
Expand All @@ -238,6 +241,7 @@ func TestClient_FindProviders(t *testing.T) {
routerErr error
clientRequiresStreaming bool
serverStreamingDisabled bool
filterProtocols []string

expErrContains osErrContains
expResult []iter.Result[types.Record]
Expand All @@ -250,6 +254,13 @@ func TestClient_FindProviders(t *testing.T) {
expResult: peerProviders,
expStreamingResponse: true,
},
{
name: "happy case with protocol filter",
filterProtocols: []string{"transport-bitswap"},
routerResult: peerProviders,
expResult: []iter.Result[types.Record]{{Val: &bitswapPeerRecord}},
expStreamingResponse: true,
},
{
name: "happy case (with deprecated bitswap schema)",
routerResult: []iter.Result[types.Record]{{Val: &bitswapRecord}},
Expand Down Expand Up @@ -305,6 +316,10 @@ func TestClient_FindProviders(t *testing.T) {
})
}

if c.filterProtocols != nil {
clientOpts = append(clientOpts, WithProtocolFilter(c.filterProtocols))
}

if c.expStreamingResponse {
onRespReceived = append(onRespReceived, func(r *http.Response) {
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
Expand Down Expand Up @@ -482,11 +497,13 @@ func TestClient_Provide(t *testing.T) {
}

func TestClient_FindPeers(t *testing.T) {
peerRecord := makePeerRecord()
peerRecord1 := makePeerRecord([]string{"transport-bitswap"})
peerRecord2 := makePeerRecord([]string{"transport-ipfs-gateway-http"})
peerRecords := []iter.Result[*types.PeerRecord]{
{Val: &peerRecord},
{Val: &peerRecord1},
{Val: &peerRecord2},
}
pid := *peerRecord.ID
pid := *peerRecord1.ID

cases := []struct {
name string
Expand All @@ -496,6 +513,7 @@ func TestClient_FindPeers(t *testing.T) {
routerErr error
clientRequiresStreaming bool
serverStreamingDisabled bool
filterProtocols []string

expErrContains osErrContains
expResult []iter.Result[*types.PeerRecord]
Expand All @@ -508,6 +526,13 @@ func TestClient_FindPeers(t *testing.T) {
expResult: peerRecords,
expStreamingResponse: true,
},
{
name: "happy case with protocol filter",
filterProtocols: []string{"transport-bitswap"},
routerResult: peerRecords,
expResult: []iter.Result[*types.PeerRecord]{{Val: &peerRecord1}},
expStreamingResponse: true,
},
{
name: "server doesn't support streaming",
routerResult: peerRecords,
Expand Down Expand Up @@ -542,12 +567,10 @@ func TestClient_FindPeers(t *testing.T) {
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var (
clientOpts []Option
serverOpts []server.Option
onRespReceived []func(*http.Response)
onReqReceived []func(*http.Request)
)
var clientOpts []Option
var serverOpts []server.Option
var onRespReceived []func(*http.Response)
var onReqReceived []func(*http.Request)

if c.serverStreamingDisabled {
serverOpts = append(serverOpts, server.WithStreamingResultsDisabled())
Expand All @@ -560,6 +583,10 @@ func TestClient_FindPeers(t *testing.T) {
})
}

if c.filterProtocols != nil {
clientOpts = append(clientOpts, WithProtocolFilter(c.filterProtocols))
}

if c.expStreamingResponse {
onRespReceived = append(onRespReceived, func(r *http.Response) {
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
Expand Down Expand Up @@ -603,7 +630,7 @@ func TestClient_FindPeers(t *testing.T) {
resultIter, err := client.FindPeers(ctx, pid)
c.expErrContains.errContains(t, err)

results := iter.ReadAll[iter.Result[*types.PeerRecord]](resultIter)
results := iter.ReadAll(resultIter)
assert.Equal(t, c.expResult, results)
})
}
Expand Down
56 changes: 52 additions & 4 deletions routing/http/server/filters.go → routing/http/filters/filters.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,48 @@
package server
package filters

import (
"net/url"
"reflect"
"slices"
"strings"

"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr"
)

// filters implements IPIP-0484
var logger = logging.Logger("routing/http/filters")

func parseFilter(param string) []string {
// Package filters implements IPIP-0484

func ParseFilter(param string) []string {
if param == "" {
return nil
}
return strings.Split(strings.ToLower(param), ",")
}

func AddFiltersToURL(baseURL string, protocolFilter, addrFilter []string) string {
parsedURL, err := url.Parse(baseURL)
if err != nil {
return baseURL
}

query := parsedURL.Query()

if len(protocolFilter) > 0 {
query.Set("filter-protocols", strings.Join(protocolFilter, ","))
}

if len(addrFilter) > 0 {
query.Set("filter-addrs", strings.Join(addrFilter, ","))
}

parsedURL.RawQuery = query.Encode()
return parsedURL.String()
}

// applyFiltersToIter applies the filters to the given iterator and returns a new iterator.
//
// The function iterates over the input iterator, applying the specified filters to each record.
Expand All @@ -28,7 +52,7 @@ func parseFilter(param string) []string {
// - recordsIter: An iterator of types.Record to be filtered.
// - filterAddrs: A slice of strings representing the address filter criteria.
// - filterProtocols: A slice of strings representing the protocol filter criteria.
func applyFiltersToIter(recordsIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) iter.ResultIter[types.Record] {
func ApplyFiltersToIter(recordsIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) iter.ResultIter[types.Record] {
mappedIter := iter.Map(recordsIter, func(v iter.Result[types.Record]) iter.Result[types.Record] {
if v.Err != nil || v.Val == nil {
return v
Expand Down Expand Up @@ -76,6 +100,30 @@ func applyFiltersToIter(recordsIter iter.ResultIter[types.Record], filterAddrs,
return filteredIter
}

func ApplyFiltersToPeerRecordIter(peerRecordIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string) iter.ResultIter[*types.PeerRecord] {
// Convert PeerRecord to Record so that we can reuse the filtering logic from findProviders
mappedIter := iter.Map(peerRecordIter, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] {
if v.Err != nil || v.Val == nil {
return iter.Result[types.Record]{Err: v.Err}
}

var record types.Record = v.Val
return iter.Result[types.Record]{Val: record}
})

filteredIter := ApplyFiltersToIter(mappedIter, filterAddrs, filterProtocols)

// Convert Record back to PeerRecord 🙃
return iter.Map(filteredIter, func(v iter.Result[types.Record]) iter.Result[*types.PeerRecord] {
if v.Err != nil || v.Val == nil {
return iter.Result[*types.PeerRecord]{Err: v.Err}
}

var record *types.PeerRecord = v.Val.(*types.PeerRecord)
return iter.Result[*types.PeerRecord]{Val: record}
})
}

// Applies the filters. Returns nil if the provider does not pass the protocols filter
// The address filter is more complicated because it potentially modifies the Addrs slice.
func applyFilters(provider *types.PeerRecord, filterAddrs, filterProtocols []string) *types.PeerRecord {
Expand Down
Loading

0 comments on commit 4d0ae45

Please sign in to comment.