Skip to content

Commit

Permalink
rcmgr: move StatsTraceReporter to rcmgr package
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 23, 2023
1 parent acca7a6 commit a808b9d
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 65 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
Expand Down Expand Up @@ -301,7 +301,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}

if !cfg.DisableMetrics {
rcmgrObs.MustRegisterWith(cfg.PrometheusRegisterer)
rcmgr.MustRegisterWith(cfg.PrometheusRegisterer)
}

h, err := bhost.NewHost(swrm, &bhost.HostOpts{
Expand Down
3 changes: 1 addition & 2 deletions dashboards/resource-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ option libp2p.PrometheusRegisterer. For example:
import (
// ...
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"

"github.com/prometheus/client_golang/prometheus"
)

func SetupResourceManager() (network.ResourceManager, error) {
str, err := rcmgrObs.NewStatsTraceReporter()
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/resource-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ limits := cfg.Build(scaledDefaultLimits)
limiter := rcmgr.NewFixedLimiter(limits)

// (Optional if you want metrics)
rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer)
str, err := rcmgrObs.NewStatsTraceReporter()
rcmgr.MustRegisterWith(prometheus.DefaultRegisterer)
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
//go:build nocover

package obs
package rcmgr

import (
"math/rand"
"sync"
"testing"
"time"

rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {
func randomTraceEvt(rng *rand.Rand) TraceEvt {
// Possibly non-sensical
typs := []rcmgr.TraceEvtTyp{
rcmgr.TraceStartEvt,
rcmgr.TraceCreateScopeEvt,
rcmgr.TraceDestroyScopeEvt,
rcmgr.TraceReserveMemoryEvt,
rcmgr.TraceBlockReserveMemoryEvt,
rcmgr.TraceReleaseMemoryEvt,
rcmgr.TraceAddStreamEvt,
rcmgr.TraceBlockAddStreamEvt,
rcmgr.TraceRemoveStreamEvt,
rcmgr.TraceAddConnEvt,
rcmgr.TraceBlockAddConnEvt,
rcmgr.TraceRemoveConnEvt,
typs := []TraceEvtTyp{
TraceStartEvt,
TraceCreateScopeEvt,
TraceDestroyScopeEvt,
TraceReserveMemoryEvt,
TraceBlockReserveMemoryEvt,
TraceReleaseMemoryEvt,
TraceAddStreamEvt,
TraceBlockAddStreamEvt,
TraceRemoveStreamEvt,
TraceAddConnEvt,
TraceBlockAddConnEvt,
TraceRemoveConnEvt,
}

names := []string{
Expand All @@ -43,7 +42,7 @@ func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {
"service:libp2p.autonat.peer:12D3Koo",
}

return rcmgr.TraceEvt{
return TraceEvt{
Type: typs[rng.Intn(len(typs))],
Name: names[rng.Intn(len(names))],
DeltaOut: rng.Intn(5),
Expand All @@ -60,7 +59,7 @@ func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {

}

var registerOnce sync.Once
var regOnce sync.Once

func BenchmarkMetricsRecording(b *testing.B) {
b.ReportAllocs()
Expand All @@ -70,7 +69,7 @@ func BenchmarkMetricsRecording(b *testing.B) {
})

evtCount := 10000
evts := make([]rcmgr.TraceEvt, evtCount)
evts := make([]TraceEvt, evtCount)
rng := rand.New(rand.NewSource(int64(b.N)))
for i := 0; i < evtCount; i++ {
evts[i] = randomTraceEvt(rng)
Expand All @@ -92,7 +91,7 @@ func TestNoAllocsNoCover(t *testing.T) {
require.NoError(t, err)

evtCount := 10_000
evts := make([]rcmgr.TraceEvt, 0, evtCount)
evts := make([]TraceEvt, 0, evtCount)
rng := rand.New(rand.NewSource(1))

for i := 0; i < evtCount; i++ {
Expand Down
18 changes: 18 additions & 0 deletions p2p/host/resource-manager/obs/obs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Package obs implements metrics tracing for resource manager
//
// Deprecated: obs is deprecated and the exported types and methods
// are moved to rcmgr package. Use the corresponding identifier in
// the rcmgr package, for example
// obs.NewStatsTraceReporter => rcmgr.NewStatsTraceReporter
package obs

import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
)

var MustRegisterWith = rcmgr.MustRegisterWith

// StatsTraceReporter reports stats on the resource manager using its traces.
type StatsTraceReporter = rcmgr.StatsTraceReporter

var NewStatsTraceReporter = rcmgr.NewStatsTraceReporter
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package obs
package rcmgr

import (
"strings"

rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/metricshelper"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -74,7 +73,7 @@ var (
previousPeerStreamsOutbound = previousPeerStreams.With(prometheus.Labels{"dir": "outbound"})

// Memory
memory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
memoryTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "memory",
Help: "Amount of memory reserved as reported to the Resource Manager",
Expand Down Expand Up @@ -151,7 +150,7 @@ func MustRegisterWith(reg prometheus.Registerer) {

previousPeerStreams,

memory,
memoryTotal,
peerMemory,
previousPeerMemory,
connMemory,
Expand All @@ -169,18 +168,18 @@ func NewStatsTraceReporter() (StatsTraceReporter, error) {
return StatsTraceReporter{}, nil
}

func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
func (r StatsTraceReporter) ConsumeEvent(evt TraceEvt) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

r.consumeEventWithLabelSlice(evt, tags)
}

// Separate func so that we can test that this function does not allocate. The syncPool may allocate.
func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags *[]string) {
func (r StatsTraceReporter) consumeEventWithLabelSlice(evt TraceEvt, tags *[]string) {
switch evt.Type {
case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
case TraceAddStreamEvt, TraceRemoveStreamEvt:
if p := PeerStrInScopeName(evt.Name); p != "" {
// Aggregated peer stats. Counts how many peers have N number of streams open.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
Expand Down Expand Up @@ -210,11 +209,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
} else {
if evt.DeltaOut != 0 {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0]
*tags = append(*tags, "outbound", evt.Name, "")
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
} else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0]
*tags = append(*tags, "outbound", "protocol", proto)
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut))
Expand All @@ -227,11 +226,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}

if evt.DeltaIn != 0 {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0]
*tags = append(*tags, "inbound", evt.Name, "")
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
} else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0]
*tags = append(*tags, "inbound", "protocol", proto)
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn))
Expand All @@ -244,8 +243,8 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
}

case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
case TraceAddConnEvt, TraceRemoveConnEvt:
if p := PeerStrInScopeName(evt.Name); p != "" {
// Aggregated peer stats. Counts how many peers have N number of connections.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
Expand Down Expand Up @@ -274,31 +273,31 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
}
} else {
if rcmgr.IsConnScope(evt.Name) {
if IsConnScope(evt.Name) {
// Not measuring this. I don't think it's useful.
break
}

if rcmgr.IsSystemScope(evt.Name) {
if IsSystemScope(evt.Name) {
connsInboundSystem.Set(float64(evt.ConnsIn))
connsOutboundSystem.Set(float64(evt.ConnsOut))
} else if rcmgr.IsTransientScope(evt.Name) {
} else if IsTransientScope(evt.Name) {
connsInboundTransient.Set(float64(evt.ConnsIn))
connsOutboundTransient.Set(float64(evt.ConnsOut))
}

// Represents the delta in fds
if evt.Delta != 0 {
if rcmgr.IsSystemScope(evt.Name) {
if IsSystemScope(evt.Name) {
fdsSystem.Set(float64(evt.FD))
} else if rcmgr.IsTransientScope(evt.Name) {
} else if IsTransientScope(evt.Name) {
fdsTransient.Set(float64(evt.FD))
}
}
}

case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
case TraceReserveMemoryEvt, TraceReleaseMemoryEvt:
if p := PeerStrInScopeName(evt.Name); p != "" {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
Expand All @@ -308,7 +307,7 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
peerMemory.Observe(float64(evt.Memory))
}
}
} else if rcmgr.IsConnScope(evt.Name) {
} else if IsConnScope(evt.Name) {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
Expand All @@ -319,14 +318,14 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
}
} else {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0]
*tags = append(*tags, evt.Name, "")
memory.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
memoryTotal.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0]
*tags = append(*tags, "protocol", proto)
memory.WithLabelValues(*tags...).Set(float64(evt.Memory))
memoryTotal.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
Expand All @@ -335,11 +334,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
}

case rcmgr.TraceBlockAddConnEvt, rcmgr.TraceBlockAddStreamEvt, rcmgr.TraceBlockReserveMemoryEvt:
case TraceBlockAddConnEvt, TraceBlockAddStreamEvt, TraceBlockReserveMemoryEvt:
var resource string
if evt.Type == rcmgr.TraceBlockAddConnEvt {
if evt.Type == TraceBlockAddConnEvt {
resource = "connection"
} else if evt.Type == rcmgr.TraceBlockAddStreamEvt {
} else if evt.Type == TraceBlockAddStreamEvt {
resource = "stream"
} else {
resource = "memory"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
package obs_test
package rcmgr

import (
"sync"
"testing"
"time"

rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/prometheus/client_golang/prometheus"
)

var registerOnce sync.Once

func TestTraceReporterStartAndClose(t *testing.T) {
rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(obs.StatsTraceReporter{}))
rcmgr, err := NewResourceManager(NewFixedLimiter(DefaultLimits.AutoScale()), WithTraceReporter(StatsTraceReporter{}))
if err != nil {
t.Fatal(err)
}
defer rcmgr.Close()
}

func TestConsumeEvent(t *testing.T) {
evt := rcmgr.TraceEvt{
Type: rcmgr.TraceBlockAddStreamEvt,
evt := TraceEvt{
Type: TraceBlockAddStreamEvt,
Name: "conn-1",
DeltaOut: 1,
Time: time.Now().Format(time.RFC3339Nano),
}

registerOnce.Do(func() {
obs.MustRegisterWith(prometheus.DefaultRegisterer)
MustRegisterWith(prometheus.DefaultRegisterer)
})

str, err := obs.NewStatsTraceReporter()
str, err := NewStatsTraceReporter()
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions p2p/test/resource-manager/rcmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -319,7 +318,7 @@ func TestReadmeExample(t *testing.T) {
limiter := rcmgr.NewFixedLimiter(limits)

// (Optional if you want metrics) Construct the OpenCensus metrics reporter.
str, err := rcmgrObs.NewStatsTraceReporter()
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
panic(err)
}
Expand Down

0 comments on commit a808b9d

Please sign in to comment.