Skip to content

Commit

Permalink
metrics/influxdb: reuse code between v1 and v2 reporters (ethereum#26963
Browse files Browse the repository at this point in the history
)
  • Loading branch information
gzliudan committed Dec 10, 2024
1 parent 75c6806 commit 8ad1bc9
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 372 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/holiman/uint256 v1.2.4
github.com/huin/goupnp v1.3.0
github.com/influxdata/influxdb v1.7.9
github.com/jackpal/go-nat-pmp v1.0.2
github.com/julienschmidt/httprouter v1.3.0
github.com/karalabe/hid v1.0.0
Expand Down Expand Up @@ -51,6 +50,7 @@ require (
github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498
github.com/ethereum/c-kzg-4844 v0.4.0
github.com/influxdata/influxdb-client-go/v2 v2.4.0
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
github.com/kylelemons/godebug v1.1.0
github.com/mattn/go-isatty v0.0.17
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXei
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/influxdata/influxdb v1.7.9 h1:uSeBTNO4rBkbp1Be5FKRsAmglM9nlx25TzVQRQt1An4=
github.com/influxdata/influxdb v1.7.9/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/influxdata/influxdb-client-go/v2 v2.4.0 h1:HGBfZYStlx3Kqvsv1h2pJixbCl/jhnFtxpKFAv9Tu5k=
github.com/influxdata/influxdb-client-go/v2 v2.4.0/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs=
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
332 changes: 101 additions & 231 deletions metrics/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,242 +2,112 @@ package influxdb

import (
"fmt"
uurl "net/url"
"time"

"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/influxdata/influxdb/client"
)

type reporter struct {
reg metrics.Registry
interval time.Duration

url uurl.URL
database string
username string
password string
namespace string
tags map[string]string

client *client.Client

cache map[string]int64
}

// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
}

// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
u, err := uurl.Parse(url)
if err != nil {
log.Warn("unable to parse InfluxDB url %s. err=%v", url, err)
return
}

rep := &reporter{
reg: r,
interval: d,
url: *u,
database: database,
username: username,
password: password,
namespace: namespace,
tags: tags,
cache: make(map[string]int64),
}
if err := rep.makeClient(); err != nil {
log.Warn("unable to make InfluxDB client. err=%v", err)
return
}

rep.run()
}

func (r *reporter) makeClient() (err error) {
r.client, err = client.NewClient(client.Config{
URL: r.url,
Username: r.username,
Password: r.password,
Timeout: 10 * time.Second,
})

return
}

func (r *reporter) run() {
intervalTicker := time.NewTicker(r.interval)
pingTicker := time.NewTicker(time.Second * 5)

defer intervalTicker.Stop()
defer pingTicker.Stop()

for {
select {
case <-intervalTicker.C:
if err := r.send(); err != nil {
log.Warn("unable to send to InfluxDB. err=%v", err)
}
case <-pingTicker.C:
_, _, err := r.client.Ping()
if err != nil {
log.Warn("got error while sending a ping to InfluxDB, trying to recreate client. err=%v", err)

if err = r.makeClient(); err != nil {
log.Warn("unable to make InfluxDB client. err=%v", err)
}
}
func readMeter(namespace, name string, i interface{}) (string, map[string]interface{}) {
switch metric := i.(type) {
case metrics.Counter:
measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{
"value": metric.Count(),
}
}
}

func (r *reporter) send() error {
var pts []client.Point

r.reg.Each(func(name string, i interface{}) {
now := time.Now()
namespace := r.namespace

switch metric := i.(type) {
case metrics.Counter:
count := metric.Count()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": count,
},
Time: now,
})
case metrics.CounterFloat64:
count := metric.Count()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": count,
},
Time: now,
})
case metrics.Gauge:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": ms.Value(),
},
Time: now,
})
case metrics.GaugeFloat64:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": ms.Value(),
},
Time: now,
})
case metrics.Histogram:
ms := metric.Snapshot()
if ms.Count() > 0 {
ps := ms.Percentiles([]float64{0.25, 0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p25": ps[0],
"p50": ps[1],
"p75": ps[2],
"p95": ps[3],
"p99": ps[4],
"p999": ps[5],
"p9999": ps[6],
}
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
Tags: r.tags,
Fields: fields,
Time: now,
})
}
case metrics.Meter:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": ms.Count(),
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"mean": ms.RateMean(),
},
Time: now,
})
case metrics.Timer:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"meanrate": ms.RateMean(),
},
Time: now,
})
case metrics.ResettingTimer:
t := metric.Snapshot()

if len(t.Values()) > 0 {
ps := t.Percentiles([]float64{50, 95, 99})
val := t.Values()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.span", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": len(val),
"max": val[len(val)-1],
"mean": t.Mean(),
"min": val[0],
"p50": ps[0],
"p95": ps[1],
"p99": ps[2],
},
Time: now,
})
}
return measurement, fields
case metrics.CounterFloat64:
measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{
"value": metric.Count(),
}
})

bps := client.BatchPoints{
Points: pts,
Database: r.database,
return measurement, fields
case metrics.Gauge:
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": metric.Snapshot().Value(),
}
return measurement, fields
case metrics.GaugeFloat64:
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": metric.Snapshot().Value(),
}
return measurement, fields
case metrics.Histogram:
ms := metric.Snapshot()
if ms.Count() <= 0 {
break
}
ps := ms.Percentiles([]float64{0.25, 0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p25": ps[0],
"p50": ps[1],
"p75": ps[2],
"p95": ps[3],
"p99": ps[4],
"p999": ps[5],
"p9999": ps[6],
}
return measurement, fields
case metrics.Meter:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.meter", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"mean": ms.RateMean(),
}
return measurement, fields
case metrics.Timer:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})

measurement := fmt.Sprintf("%s%s.timer", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"meanrate": ms.RateMean(),
}
return measurement, fields
case metrics.ResettingTimer:
t := metric.Snapshot()
if len(t.Values()) == 0 {
break
}
ps := t.Percentiles([]float64{50, 95, 99})
val := t.Values()
measurement := fmt.Sprintf("%s%s.span", namespace, name)
fields := map[string]interface{}{
"count": len(val),
"max": val[len(val)-1],
"mean": t.Mean(),
"min": val[0],
"p50": ps[0],
"p95": ps[1],
"p99": ps[2],
}
return measurement, fields
}

_, err := r.client.Write(bps)
return err
return "", nil
}
Loading

0 comments on commit 8ad1bc9

Please sign in to comment.