Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb: use atomic int added in go1.19 #26935

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ const (
// The background thread will keep moving ancient chain segments from key-value
// database to flat files for saving space on live database.
type chainFreezer struct {
// WARNING: The `threshold` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
threshold atomic.Uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

*Freezer
quit chan struct{}
Expand All @@ -60,12 +57,13 @@ func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFre
if err != nil {
return nil, err
}
return &chainFreezer{
Freezer: freezer,
threshold: params.FullImmutabilityThreshold,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}, nil
cf := chainFreezer{
Freezer: freezer,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}
cf.threshold.Store(params.FullImmutabilityThreshold)
return &cf, nil
}

// Close closes the chain freezer instance and terminates the background thread.
Expand Down Expand Up @@ -124,8 +122,8 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
continue
}
number := ReadHeaderNumber(nfdb, hash)
threshold := atomic.LoadUint64(&f.threshold)
frozen := atomic.LoadUint64(&f.frozen)
threshold := f.threshold.Load()
frozen := f.frozen.Load()
switch {
case number == nil:
log.Error("Current full block number unavailable", "hash", hash)
Expand Down Expand Up @@ -186,7 +184,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {

// Wipe out side chains also and track dangling side chains
var dangling []common.Hash
frozen = atomic.LoadUint64(&f.frozen) // Needs reload after during freezeRange
frozen = f.frozen.Load() // Needs reload after during freezeRange
for number := first; number < frozen; number++ {
// Always keep the genesis block in active database
if number != 0 {
Expand Down
5 changes: 3 additions & 2 deletions core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,12 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
}
}
// process runs in parallel
nThreadsAlive := int32(threads)
var nThreadsAlive atomic.Int32
nThreadsAlive.Store(int32(threads))
process := func() {
defer func() {
// Last processor closes the result channel
if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
if nThreadsAlive.Add(-1) == 0 {
close(hashesCh)
}
}()
Expand Down
7 changes: 3 additions & 4 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"path"
"path/filepath"
"strings"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -72,9 +71,9 @@ func (frdb *freezerdb) Freeze(threshold uint64) error {
}
// Set the freezer threshold to a temporary value
defer func(old uint64) {
atomic.StoreUint64(&frdb.AncientStore.(*chainFreezer).threshold, old)
}(atomic.LoadUint64(&frdb.AncientStore.(*chainFreezer).threshold))
atomic.StoreUint64(&frdb.AncientStore.(*chainFreezer).threshold, threshold)
frdb.AncientStore.(*chainFreezer).threshold.Store(old)
}(frdb.AncientStore.(*chainFreezer).threshold.Load())
frdb.AncientStore.(*chainFreezer).threshold.Store(threshold)

// Trigger a freeze cycle and block until it's done
trigger := make(chan struct{}, 1)
Expand Down
53 changes: 25 additions & 28 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ const freezerTableSize = 2 * 1000 * 1000 * 1000
// reserving it for go-ethereum. This would also reduce the memory requirements
// of Geth, and thus also GC overhead.
type Freezer struct {
// WARNING: The `frozen` and `tail` fields are accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // Number of blocks already frozen
tail uint64 // Number of the first stored item in the freezer
frozen atomic.Uint64 // Number of blocks already frozen
tail atomic.Uint64 // Number of the first stored item in the freezer

// This lock synchronizes writers and the truncate operation, as well as
// the "atomic" (batched) read operations.
Expand Down Expand Up @@ -212,12 +209,12 @@ func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]

// Ancients returns the length of the frozen items.
func (f *Freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
return f.frozen.Load(), nil
}

// Tail returns the number of first stored item in the freezer.
func (f *Freezer) Tail() (uint64, error) {
return atomic.LoadUint64(&f.tail), nil
return f.tail.Load(), nil
}

// AncientSize returns the ancient size of the specified category.
Expand Down Expand Up @@ -251,7 +248,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
defer f.writeLock.Unlock()

// Roll back all tables to the starting position in case of error.
prevItem := atomic.LoadUint64(&f.frozen)
prevItem := f.frozen.Load()
defer func() {
if err != nil {
// The write operation has failed. Go back to the previous item position.
Expand All @@ -272,7 +269,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
if err != nil {
return 0, err
}
atomic.StoreUint64(&f.frozen, item)
f.frozen.Store(item)
return writeSize, nil
}

Expand All @@ -284,15 +281,15 @@ func (f *Freezer) TruncateHead(items uint64) error {
f.writeLock.Lock()
defer f.writeLock.Unlock()

if atomic.LoadUint64(&f.frozen) <= items {
if f.frozen.Load() <= items {
return nil
}
for _, table := range f.tables {
if err := table.truncateHead(items); err != nil {
return err
}
}
atomic.StoreUint64(&f.frozen, items)
f.frozen.Store(items)
return nil
}

Expand All @@ -304,15 +301,15 @@ func (f *Freezer) TruncateTail(tail uint64) error {
f.writeLock.Lock()
defer f.writeLock.Unlock()

if atomic.LoadUint64(&f.tail) >= tail {
if f.tail.Load() >= tail {
return nil
}
for _, table := range f.tables {
if err := table.truncateTail(tail); err != nil {
return err
}
}
atomic.StoreUint64(&f.tail, tail)
f.tail.Store(tail)
return nil
}

Expand Down Expand Up @@ -343,22 +340,22 @@ func (f *Freezer) validate() error {
)
// Hack to get boundary of any table
for kind, table := range f.tables {
head = atomic.LoadUint64(&table.items)
tail = atomic.LoadUint64(&table.itemHidden)
head = table.items.Load()
tail = table.itemHidden.Load()
name = kind
break
}
// Now check every table against those boundaries.
for kind, table := range f.tables {
if head != atomic.LoadUint64(&table.items) {
return fmt.Errorf("freezer tables %s and %s have differing head: %d != %d", kind, name, atomic.LoadUint64(&table.items), head)
if head != table.items.Load() {
return fmt.Errorf("freezer tables %s and %s have differing head: %d != %d", kind, name, table.items.Load(), head)
}
if tail != atomic.LoadUint64(&table.itemHidden) {
return fmt.Errorf("freezer tables %s and %s have differing tail: %d != %d", kind, name, atomic.LoadUint64(&table.itemHidden), tail)
if tail != table.itemHidden.Load() {
return fmt.Errorf("freezer tables %s and %s have differing tail: %d != %d", kind, name, table.itemHidden.Load(), tail)
}
}
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
f.frozen.Store(head)
f.tail.Store(tail)
return nil
}

Expand All @@ -369,11 +366,11 @@ func (f *Freezer) repair() error {
tail = uint64(0)
)
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
items := table.items.Load()
if head > items {
head = items
}
hidden := atomic.LoadUint64(&table.itemHidden)
hidden := table.itemHidden.Load()
if hidden > tail {
tail = hidden
}
Expand All @@ -386,8 +383,8 @@ func (f *Freezer) repair() error {
return err
}
}
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
f.frozen.Store(head)
f.tail.Store(tail)
return nil
}

Expand All @@ -413,7 +410,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
// and that error will be returned.
forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error {
var (
items = atomic.LoadUint64(&t.items)
items = t.items.Load()
batchSize = uint64(1024)
maxBytes = uint64(1024 * 1024)
)
Expand All @@ -436,7 +433,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
}
// TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
// process assumes no deletion at tail and needs to be modified to account for that.
if table.itemOffset > 0 || table.itemHidden > 0 {
if table.itemOffset.Load() > 0 || table.itemHidden.Load() > 0 {
return fmt.Errorf("migration not supported for tail-deleted freezers")
}
ancientsPath := filepath.Dir(table.index.Name())
Expand All @@ -452,7 +449,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
out []byte
start = time.Now()
logged = time.Now()
offset = newTable.items
offset = newTable.items.Load()
)
if offset > 0 {
log.Info("found previous migration attempt", "migrated", offset)
Expand Down
5 changes: 2 additions & 3 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package rawdb

import (
"fmt"
"sync/atomic"

"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -107,7 +106,7 @@ func (t *freezerTable) newBatch() *freezerTableBatch {
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = atomic.LoadUint64(&batch.t.items)
batch.curItem = batch.t.items.Load()
batch.totalBytes = 0
}

Expand Down Expand Up @@ -201,7 +200,7 @@ func (batch *freezerTableBatch) commit() error {

// Update headBytes of table.
batch.t.headBytes += dataSize
atomic.StoreUint64(&batch.t.items, batch.curItem)
batch.t.items.Store(batch.curItem)

// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
Expand Down
Loading