Skip to content

Commit

Permalink
metrics: use atomic type (#27121) (#384)
Browse files Browse the repository at this point in the history
We observe the data race warning when enabling metric

  WARNING: DATA RACE
  Read at 0x00c02cc7fe90 by goroutine 49:
    github.com/ethereum/go-ethereum/metrics.(*StandardMeter).Snapshot()
        github.com/ethereum/go-ethereum/metrics/meter.go:244 +0x6e
    github.com/ethereum/go-ethereum/metrics/prometheus.Handler.func1()
        github.com/ethereum/go-ethereum/metrics/prometheus/prometheus.go:55 +0x3c2

  Previous write at 0x00c02cc7fe90 by goroutine 251:
    sync/atomic.AddInt64()
        runtime/race_amd64.s:289 +0xb
    sync/atomic.AddInt64()
        <autogenerated>:1 +0x1b
    github.com/ethereum/go-ethereum/p2p.(*rlpxTransport).WriteMsg()
        github.com/ethereum/go-ethereum/p2p/transport.go:103 +0x4c3
    github.com/ethereum/go-ethereum/p2p.(*conn).WriteMsg()
        <autogenerated>:1 +0xb9

The data race happens on StandardMeter.snapshot.temp field. This field access does not follow the StandardMeter.lock but use atomic access. The write side in StandardMeter.Mark is atomic, however, the read side in StandardMeter.Snapshot is not.

We cherry-pick this commit to correctly use atomic read of StandardMeter.snapshot.temp in StandardMeter.Snapshot. This commit also replaces the use of atomic.Load/StoreInt64 with atomic.Int64 type.

Co-authored-by: s7v7nislands <[email protected]>
  • Loading branch information
minh-bq and s7v7nislands authored Nov 30, 2023
1 parent f4a8cdb commit d03c78b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 28 deletions.
14 changes: 7 additions & 7 deletions metrics/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func NewCounter() Counter {
if !Enabled {
return NilCounter{}
}
return &StandardCounter{0}
return &StandardCounter{}
}

// NewCounterForced constructs a new StandardCounter and returns it no matter if
// the global switch is enabled or not.
func NewCounterForced() Counter {
return &StandardCounter{0}
return &StandardCounter{}
}

// NewRegisteredCounter constructs and registers a new StandardCounter.
Expand Down Expand Up @@ -115,27 +115,27 @@ func (NilCounter) Snapshot() Counter { return NilCounter{} }
// StandardCounter is the standard implementation of a Counter and uses the
// sync/atomic package to manage a single int64 value.
type StandardCounter struct {
count int64
count atomic.Int64
}

// Clear sets the counter to zero.
func (c *StandardCounter) Clear() {
atomic.StoreInt64(&c.count, 0)
c.count.Store(0)
}

// Count returns the current count.
func (c *StandardCounter) Count() int64 {
return atomic.LoadInt64(&c.count)
return c.count.Load()
}

// Dec decrements the counter by the given amount.
func (c *StandardCounter) Dec(i int64) {
atomic.AddInt64(&c.count, -i)
c.count.Add(-i)
}

// Inc increments the counter by the given amount.
func (c *StandardCounter) Inc(i int64) {
atomic.AddInt64(&c.count, i)
c.count.Add(i)
}

// Snapshot returns a read-only copy of the counter.
Expand Down
8 changes: 4 additions & 4 deletions metrics/ewma.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (NilEWMA) Update(n int64) {}
// of uncounted events and processes them on each tick. It uses the
// sync/atomic package to manage uncounted events.
type StandardEWMA struct {
uncounted int64 // /!\ this should be the first member to ensure 64-bit alignment
uncounted atomic.Int64
alpha float64
rate float64
init bool
Expand All @@ -97,8 +97,8 @@ func (a *StandardEWMA) Snapshot() EWMA {
// Tick ticks the clock to update the moving average. It assumes it is called
// every five seconds.
func (a *StandardEWMA) Tick() {
count := atomic.LoadInt64(&a.uncounted)
atomic.AddInt64(&a.uncounted, -count)
count := a.uncounted.Load()
a.uncounted.Add(-count)
instantRate := float64(count) / float64(5*time.Second)
a.mutex.Lock()
defer a.mutex.Unlock()
Expand All @@ -112,5 +112,5 @@ func (a *StandardEWMA) Tick() {

// Update adds n uncounted events.
func (a *StandardEWMA) Update(n int64) {
atomic.AddInt64(&a.uncounted, n)
a.uncounted.Add(n)
}
12 changes: 6 additions & 6 deletions metrics/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewGauge() Gauge {
if !Enabled {
return NilGauge{}
}
return &StandardGauge{0}
return &StandardGauge{}
}

// NewRegisteredGauge constructs and registers a new StandardGauge.
Expand Down Expand Up @@ -101,7 +101,7 @@ func (NilGauge) Value() int64 { return 0 }
// StandardGauge is the standard implementation of a Gauge and uses the
// sync/atomic package to manage a single int64 value.
type StandardGauge struct {
value int64
value atomic.Int64
}

// Snapshot returns a read-only copy of the gauge.
Expand All @@ -111,22 +111,22 @@ func (g *StandardGauge) Snapshot() Gauge {

// Update updates the gauge's value.
func (g *StandardGauge) Update(v int64) {
atomic.StoreInt64(&g.value, v)
g.value.Store(v)
}

// Value returns the gauge's current value.
func (g *StandardGauge) Value() int64 {
return atomic.LoadInt64(&g.value)
return g.value.Load()
}

// Dec decrements the gauge's current value by the given amount.
func (g *StandardGauge) Dec(i int64) {
atomic.AddInt64(&g.value, -i)
g.value.Add(-i)
}

// Inc increments the gauge's current value by the given amount.
func (g *StandardGauge) Inc(i int64) {
atomic.AddInt64(&g.value, i)
g.value.Add(i)
}

// FunctionalGauge returns value from given function
Expand Down
25 changes: 14 additions & 11 deletions metrics/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,7 @@ func NewRegisteredMeterForced(name string, r Registry) Meter {

// MeterSnapshot is a read-only copy of another Meter.
type MeterSnapshot struct {
// WARNING: The `temp` field is accessed atomically.
// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
// guaranteed to be so aligned, so take advantage of that. For more information,
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
temp int64
temp atomic.Int64
count int64
rate1, rate5, rate15, rateMean float64
}
Expand Down Expand Up @@ -173,7 +169,7 @@ type StandardMeter struct {
snapshot *MeterSnapshot
a1, a5, a15 EWMA
startTime time.Time
stopped uint32
stopped atomic.Bool
}

func newStandardMeter() *StandardMeter {
Expand All @@ -188,8 +184,8 @@ func newStandardMeter() *StandardMeter {

// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() {
stopped := atomic.SwapUint32(&m.stopped, 1)
if stopped != 1 {
stopped := m.stopped.Swap(true)
if !stopped {
arbiter.Lock()
delete(arbiter.meters, m)
arbiter.Unlock()
Expand All @@ -207,7 +203,7 @@ func (m *StandardMeter) Count() int64 {

// Mark records the occurrence of n events.
func (m *StandardMeter) Mark(n int64) {
atomic.AddInt64(&m.snapshot.temp, n)
m.snapshot.temp.Add(n)
}

// Rate1 returns the one-minute moving average rate of events per second.
Expand Down Expand Up @@ -241,7 +237,14 @@ func (m *StandardMeter) RateMean() float64 {
// Snapshot returns a read-only copy of the meter.
func (m *StandardMeter) Snapshot() Meter {
m.lock.RLock()
snapshot := *m.snapshot
snapshot := MeterSnapshot{
count: m.snapshot.count,
rate1: m.snapshot.rate1,
rate5: m.snapshot.rate5,
rate15: m.snapshot.rate15,
rateMean: m.snapshot.rateMean,
}
snapshot.temp.Store(m.snapshot.temp.Load())
m.lock.RUnlock()
return &snapshot
}
Expand All @@ -257,7 +260,7 @@ func (m *StandardMeter) updateSnapshot() {

func (m *StandardMeter) updateMeter() {
// should only run with write lock held on m.lock
n := atomic.SwapInt64(&m.snapshot.temp, 0)
n := m.snapshot.temp.Swap(0)
m.snapshot.count += n
m.a1.Update(n)
m.a5.Update(n)
Expand Down

0 comments on commit d03c78b

Please sign in to comment.