From f901041385bdef74e6cca08dc1baf5c8ce477c27 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 24 Aug 2023 13:33:19 +0200 Subject: [PATCH] chore: use go-libp2p-routing-helpers for tracing needs go-libp2p-routing-helpers has an optimized implementation that does nothing if we are not tracing, it also properly log all IO of the request. --- dht.go | 4 ++++ dht_bootstrap.go | 5 ++++- dual/dual.go | 45 ++++++++++++++++++++++++++++++++------ fullrt/dht.go | 56 +++++++++++++++++++++++++++++------------------- go.mod | 3 ++- go.sum | 6 ++++-- routing.go | 41 ++++++++++++++++------------------- 7 files changed, 105 insertions(+), 55 deletions(-) diff --git a/dht.go b/dht.go index a07a51308..c8fb3c77d 100644 --- a/dht.go +++ b/dht.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p-routing-helpers/tracing" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -40,6 +41,9 @@ import ( "go.uber.org/zap" ) +const tracer = tracing.Tracer("go-libp2p-kad-dht") +const dhtName = "IpfsDHT" + var ( logger = logging.Logger("dht") baseLogger = logger.Desugar() diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 58eb88b40..03029ad51 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -57,7 +57,10 @@ func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo { // Bootstrap tells the DHT to get into a bootstrapped state satisfying the // IpfsRouter interface. -func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { +func (dht *IpfsDHT) Bootstrap(ctx context.Context) (err error) { + _, end := tracer.Bootstrap(dhtName, ctx) + defer func() { end(err) }() + dht.fixRTIfNeeded() dht.rtRefreshManager.RefreshNoWait() return nil diff --git a/dual/dual.go b/dual/dual.go index d75555b30..0f94cf728 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -8,6 +8,8 @@ import ( "sync" dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/internal" + "github.com/libp2p/go-libp2p-routing-helpers/tracing" "github.com/ipfs/go-cid" kb "github.com/libp2p/go-libp2p-kbucket" @@ -24,6 +26,9 @@ import ( "github.com/hashicorp/go-multierror" ) +const tracer = tracing.Tracer("go-libp2p-kad-dht/dual") +const dualName = "Dual" + // DHT implements the routing interface to provide two concrete DHT implementationts for use // in IPFS that are used to support both global network users and disjoint LAN usecases. type DHT struct { @@ -158,7 +163,10 @@ func (dht *DHT) WANActive() bool { } // Provide adds the given cid to the content routing system. -func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error { +func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) (err error) { + ctx, end := tracer.Provide(dualName, ctx, key, announce) + defer func() { end(err) }() + if dht.WANActive() { return dht.WAN.Provide(ctx, key, announce) } @@ -174,7 +182,10 @@ func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStat } // FindProvidersAsync searches for peers who are able to provide a given key -func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { +func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) (ch <-chan peer.AddrInfo) { + ctx, end := tracer.FindProvidersAsync(dualName, ctx, key, count) + defer func() { ch = end(ch, nil) }() + reqCtx, cancel := context.WithCancel(ctx) outCh := make(chan peer.AddrInfo) @@ -185,10 +196,13 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) subCtx, evtCh = routing.RegisterForQueryEvents(reqCtx) } + subCtx, span := internal.StartSpan(subCtx, "Dual.worker") wanCh := dht.WAN.FindProvidersAsync(subCtx, key, count) lanCh := dht.LAN.FindProvidersAsync(subCtx, key, count) zeroCount := (count == 0) go func() { + defer span.End() + defer cancel() defer close(outCh) @@ -207,11 +221,13 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) continue case pi, ok = <-wanCh: if !ok { + span.AddEvent("wan finished") wanCh = nil continue } case pi, ok = <-lanCh: if !ok { + span.AddEvent("lan finished") lanCh = nil continue } @@ -238,7 +254,10 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) // FindPeer searches for a peer with given ID // Note: with signed peer records, we can change this to short circuit once either DHT returns. -func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) { +func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (pi peer.AddrInfo, err error) { + ctx, end := tracer.FindPeer(dualName, ctx, pid) + defer func() { end(pi, err) }() + var wg sync.WaitGroup wg.Add(2) var wanInfo, lanInfo peer.AddrInfo @@ -304,14 +323,20 @@ func combineErrors(erra, errb error) error { // Bootstrap allows callers to hint to the routing system to get into a // Boostrapped state and remain there. -func (dht *DHT) Bootstrap(ctx context.Context) error { +func (dht *DHT) Bootstrap(ctx context.Context) (err error) { + ctx, end := tracer.Bootstrap(dualName, ctx) + defer func() { end(err) }() + erra := dht.WAN.Bootstrap(ctx) errb := dht.LAN.Bootstrap(ctx) return combineErrors(erra, errb) } // PutValue adds value corresponding to given Key. -func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error { +func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) (err error) { + ctx, end := tracer.PutValue(dualName, ctx, key, val, opts...) + defer func() { end(err) }() + if dht.WANActive() { return dht.WAN.PutValue(ctx, key, val, opts...) } @@ -319,7 +344,10 @@ func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...ro } // GetValue searches for the value corresponding to given Key. -func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { +func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (result []byte, err error) { + ctx, end := tracer.GetValue(dualName, ctx, key, opts...) + defer func() { end(result, err) }() + lanCtx, cancelLan := context.WithCancel(ctx) defer cancelLan() @@ -349,7 +377,10 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) } // SearchValue searches for better values from this value -func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { +func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (ch <-chan []byte, err error) { + ctx, end := tracer.SearchValue(dualName, ctx, key, opts...) + defer func() { ch, err = end(ch, err) }() + p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator} return p.SearchValue(ctx, key, opts...) } diff --git a/fullrt/dht.go b/fullrt/dht.go index 8b1b4fe53..60842cf3d 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -14,6 +14,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" + "github.com/libp2p/go-libp2p-routing-helpers/tracing" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -50,6 +51,9 @@ import ( var logger = logging.Logger("fullrtdht") +const tracer = tracing.Tracer("go-libp2p-kad-dht/fullrt") +const dhtName = "FullRT" + const rtRefreshLimitsMsg = `Accelerated DHT client was unable to fully refresh its routing table due to Resource Manager limits, which may degrade content routing. Consider increasing resource limits. See debug logs for the "dht-crawler" subsystem for details.` // FullRT is an experimental DHT client that is under development. Expect breaking changes to occur in this client @@ -358,7 +362,12 @@ func (dht *FullRT) Close() error { return dht.ProviderManager.Close() } -func (dht *FullRT) Bootstrap(ctx context.Context) error { +func (dht *FullRT) Bootstrap(ctx context.Context) (err error) { + _, end := tracer.Bootstrap(dhtName, ctx) + defer func() { end(err) }() + + // TODO: This should block until the first crawl finish. + return nil } @@ -454,6 +463,9 @@ func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { + ctx, end := tracer.PutValue(dhtName, ctx, key, value, opts...) + defer func() { end(err) }() + if !dht.enableValues { return routing.ErrNotSupported } @@ -518,7 +530,10 @@ type RecvdVal struct { } // GetValue searches for the value corresponding to given Key. -func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { +func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Option) (result []byte, err error) { + ctx, end := tracer.GetValue(dhtName, ctx, key, opts...) + defer func() { end(result, err) }() + if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -552,14 +567,9 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt } // SearchValue searches for the value corresponding to given Key and streams the results. -func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { - ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) - var good bool - defer func() { - if !good { - span.End() - } - }() +func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (ch <-chan []byte, err error) { + ctx, end := tracer.SearchValue(dhtName, ctx, key, opts...) + defer func() { ch, err = end(ch, err) }() if !dht.enableValues { return nil, routing.ErrNotSupported @@ -579,9 +589,7 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing. valCh, lookupRes := dht.getValues(ctx, key, stopCh) out := make(chan []byte) - good = true go func() { - defer span.End() defer close(out) best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) @@ -789,8 +797,8 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str // Provide makes this node announce that it can provide a value for the given key func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { - ctx, span := internal.StartSpan(ctx, "FullRT.Provide", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Bool("Broadcast", brdcst))) - defer span.End() + ctx, end := tracer.Provide(dhtName, ctx, key, brdcst) + defer func() { end(err) }() if !dht.enableProviders { return routing.ErrNotSupported @@ -932,9 +940,9 @@ func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer return numSuccess } -func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) error { - ctx, span := internal.StartSpan(ctx, "FullRT.ProvideMany", trace.WithAttributes(attribute.Int("NumKeys", len(keys)))) - defer span.End() +func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) (err error) { + ctx, end := tracer.ProvideMany(dhtName, ctx, keys) + defer func() { end(err) }() if !dht.enableProviders { return routing.ErrNotSupported @@ -1220,7 +1228,10 @@ func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInf // the search query completes. If count is zero then the query will run until it // completes. Note: not reading from the returned channel may block the query // from progressing. -func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { +func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) (ch <-chan peer.AddrInfo) { + ctx, end := tracer.FindProvidersAsync(dhtName, ctx, key, count) + defer func() { ch = end(ch, nil) }() + if !dht.enableProviders || !key.Defined() { peerOut := make(chan peer.AddrInfo) close(peerOut) @@ -1241,7 +1252,8 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in } func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { - ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRoutine", trace.WithAttributes(attribute.Stringer("Key", key))) + // use a span here because unlike tracer.FindProvidersAsync we know who told us about it and that intresting to log. + ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRoutine") defer span.End() defer close(peerOut) @@ -1351,9 +1363,9 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. } // FindPeer searches for a peer with given ID. -func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { - ctx, span := internal.StartSpan(ctx, "FullRT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id))) - defer span.End() +func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, err error) { + ctx, end := tracer.FindPeer(dhtName, ctx, id) + defer func() { end(pi, err) }() if err := id.Validate(); err != nil { return peer.AddrInfo{}, err diff --git a/go.mod b/go.mod index b4395bacc..b80d48f83 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/libp2p/go-libp2p v0.30.0 github.com/libp2p/go-libp2p-kbucket v0.6.3 github.com/libp2p/go-libp2p-record v0.2.0 - github.com/libp2p/go-libp2p-routing-helpers v0.7.0 + github.com/libp2p/go-libp2p-routing-helpers v0.7.2 github.com/libp2p/go-libp2p-testing v0.12.0 github.com/libp2p/go-libp2p-xor v0.1.0 github.com/libp2p/go-msgio v0.3.0 @@ -39,6 +39,7 @@ require ( ) require ( + github.com/Jorropo/jsync v1.0.1 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index 4701f7779..d0debe3f2 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Jorropo/jsync v1.0.1 h1:6HgRolFZnsdfzRUj+ImB9og1JYOxQoReSywkHOGSaUU= +github.com/Jorropo/jsync v1.0.1/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -253,8 +255,8 @@ github.com/libp2p/go-libp2p-kbucket v0.6.3/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEH github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= -github.com/libp2p/go-libp2p-routing-helpers v0.7.0 h1:sirOYVD0wGWjkDwHZvinunIpaqPLBXkcnXApVHwZFGA= -github.com/libp2p/go-libp2p-routing-helpers v0.7.0/go.mod h1:R289GUxUMzRXIbWGSuUUTPrlVJZ3Y/pPz495+qgXJX8= +github.com/libp2p/go-libp2p-routing-helpers v0.7.2 h1:xJMFyhQ3Iuqnk9Q2dYE1eUTzsah7NLw3Qs2zjUV78T0= +github.com/libp2p/go-libp2p-routing-helpers v0.7.2/go.mod h1:cN4mJAD/7zfPKXBcs9ze31JGYAZgzdABEm+q/hkswb8= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-libp2p-xor v0.1.0 h1:hhQwT4uGrBcuAkUGXADuPltalOdpf9aag9kaYNT2tLA= diff --git a/routing.go b/routing.go index 3c2111aa8..1a68c6eb6 100644 --- a/routing.go +++ b/routing.go @@ -33,8 +33,8 @@ import ( // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) - defer span.End() + ctx, end := tracer.PutValue(dhtName, ctx, key, value, opts...) + defer func() { end(err) }() if !dht.enableValues { return routing.ErrNotSupported @@ -107,9 +107,9 @@ type recvdVal struct { } // GetValue searches for the value corresponding to given Key. -func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) - defer span.End() +func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (result []byte, err error) { + ctx, end := tracer.GetValue(dhtName, ctx, key, opts...) + defer func() { end(result, err) }() if !dht.enableValues { return nil, routing.ErrNotSupported @@ -144,14 +144,9 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op } // SearchValue searches for the value corresponding to given Key and streams the results. -func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) - var good bool - defer func() { - if !good { - span.End() - } - }() +func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (ch <-chan []byte, err error) { + ctx, end := tracer.SearchValue(dhtName, ctx, key, opts...) + defer func() { ch, err = end(ch, err) }() if !dht.enableValues { return nil, routing.ErrNotSupported @@ -171,9 +166,7 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing valCh, lookupRes := dht.getValues(ctx, key, stopCh) out := make(chan []byte) - good = true go func() { - defer span.End() defer close(out) best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) if best == nil || aborted { @@ -392,8 +385,8 @@ func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollow // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.Provide", trace.WithAttributes(attribute.String("Key", key.String()), attribute.Bool("Broadcast", brdcst))) - defer span.End() + ctx, end := tracer.Provide(dhtName, ctx, key, brdcst) + defer func() { end(err) }() if !dht.enableProviders { return routing.ErrNotSupported @@ -500,7 +493,10 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn // the search query completes. If count is zero then the query will run until it // completes. Note: not reading from the returned channel may block the query // from progressing. -func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { +func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) (ch <-chan peer.AddrInfo) { + ctx, end := tracer.FindProvidersAsync(dhtName, ctx, key, count) + defer func() { ch = end(ch, nil) }() + if !dht.enableProviders || !key.Defined() { peerOut := make(chan peer.AddrInfo) close(peerOut) @@ -521,7 +517,8 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i } func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsyncRoutine", trace.WithAttributes(attribute.Stringer("Key", key))) + // use a span here because unlike tracer.FindProvidersAsync we know who told us about it and that intresting to log. + ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsyncRoutine") defer span.End() defer close(peerOut) @@ -632,9 +629,9 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash } // FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id))) - defer span.End() +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, err error) { + ctx, end := tracer.FindPeer(dhtName, ctx, id) + defer func() { end(pi, err) }() if err := id.Validate(); err != nil { return peer.AddrInfo{}, err