Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewind fix #46

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,14 @@ func verifyState(ctx *cli.Context) error {
log.Error("Failed to load head block")
return errors.New("no head block")
}
snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, headBlock.Root(), false, false, false)
snapConfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
ReBuild: false,
AsyncBuild: false,
AllowForceUpdate: false,
}
snaptree, err := snapshot.New(snapConfig, chaindb, trie.NewDatabase(chaindb), headBlock.Root())
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
Expand Down
24 changes: 18 additions & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,22 @@ type CacheConfig struct {
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk
AllowForceUpdate bool // Enable to force root snapshots based on the configured commits threshold
CommitThreshold int // Threshold of commits to force a root snapshot update

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}

// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
AllowForceUpdate: false,
CommitThreshold: 128,
}

// BlockChain represents the canonical chain given a database with a genesis
Expand Down Expand Up @@ -403,7 +407,15 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
recover = true
}
bc.snaps, err = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, true, recover)
snapConfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
ReBuild: true,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
AllowForceUpdate: bc.cacheConfig.AllowForceUpdate,
CommitThreshold: bc.cacheConfig.CommitThreshold,
}
bc.snaps, err = snapshot.New(snapConfig, bc.db, bc.stateCache.TrieDB(), head.Root())
if err != nil {
log.Error("Error trying to load snapshot", "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
if tt.commitBlock > 0 {
chain.stateCache.TrieDB().Commit(canonblocks[tt.commitBlock-1].Root(), true, nil)
if snapshots {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0, false); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2007,7 +2007,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
if tt.commitBlock > 0 {
chain.stateCache.TrieDB().Commit(canonblocks[tt.commitBlock-1].Root(), true, nil)
if snapshots {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0, false); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
// Flushing the entire snap tree into the disk, the
// relavant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
chain.snaps.Cap(blocks[point-1].Root(), 0)
chain.snaps.Cap(blocks[point-1].Root(), 0, false)
diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root()
if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) {
t.Fatalf("Failed to flush disk layer change, want %x, got %x", blockRoot, diskRoot)
Expand Down
21 changes: 18 additions & 3 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,14 @@ func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize uint6
if headBlock == nil {
return nil, errors.New("Failed to load head block")
}
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, false)
snapConfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
ReBuild: false,
AsyncBuild: false,
AllowForceUpdate: false,
}
snaptree, err := snapshot.New(snapConfig, db, trie.NewDatabase(db), headBlock.Root())
if err != nil {
return nil, err // The relevant snapshot(s) might not exist
}
Expand Down Expand Up @@ -189,7 +196,7 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta
// Pruning is done, now drop the "useless" layers from the snapshot.
// Firstly, flushing the target layer into the disk. After that all
// diff layers below the target will all be merged into the disk.
if err := snaptree.Cap(root, 0); err != nil {
if err := snaptree.Cap(root, 0, false); err != nil {
return err
}
// Secondly, flushing the snapshot journal into the disk. All diff
Expand Down Expand Up @@ -355,6 +362,14 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
if headBlock == nil {
return errors.New("Failed to load head block")
}

snapConfig := snapshot.Config{
CacheSize: 256,
Recovery: true,
ReBuild: false,
AsyncBuild: false,
AllowForceUpdate: false,
}
// Initialize the snapshot tree in recovery mode to handle this special case:
// - Users run the `prune-state` command multiple times
// - Neither these `prune-state` running is finished(e.g. interrupted manually)
Expand All @@ -363,7 +378,7 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
// - The state HEAD is rewound already because of multiple incomplete `prune-state`
// In this case, even the state HEAD is not exactly matched with snapshot, it
// still feasible to recover the pruning correctly.
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, true)
snaptree, err := snapshot.New(snapConfig, db, trie.NewDatabase(db), headBlock.Root())
if err != nil {
return err // The relevant snapshot(s) might not exist
}
Expand Down
3 changes: 3 additions & 0 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ var (
bloomDestructHasherOffset = 0
bloomAccountHasherOffset = 0
bloomStorageHasherOffset = 0

// Count for number of commits before forcing disk root update
defaultCommitThreshold = 128
)

func init() {
Expand Down
8 changes: 4 additions & 4 deletions core/state/snapshot/disklayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestDiskMerge(t *testing.T) {
}); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
// Retrieve all the data through the disk layer and validate it
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestDiskPartialMerge(t *testing.T) {
}); err != nil {
t.Fatalf("test %d: failed to update snapshot tree: %v", i, err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("test %d: failed to flatten snapshot tree: %v", i, err)
}
// Retrieve all the data through the disk layer and validate it
Expand Down Expand Up @@ -471,7 +471,7 @@ func TestDiskGeneratorPersistence(t *testing.T) {
}, nil); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
if err := snaps.Cap(diffRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob := rawdb.ReadSnapshotGenerator(db)
Expand All @@ -493,7 +493,7 @@ func TestDiskGeneratorPersistence(t *testing.T) {
}
diskLayer := snaps.layers[snaps.diskRoot()].(*diskLayer)
diskLayer.genMarker = nil // Construction finished
if err := snaps.Cap(diffTwoRoot, 0); err != nil {
if err := snaps.Cap(diffTwoRoot, 0, false); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob = rawdb.ReadSnapshotGenerator(db)
Expand Down
12 changes: 6 additions & 6 deletions core/state/snapshot/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestAccountIteratorTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x04"), 2)
snaps.Cap(common.HexToHash("0x04"), 2, false)
verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)

it, _ = snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{})
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestStorageIteratorTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x04"), 2)
snaps.Cap(common.HexToHash("0x04"), 2, false)
verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa")), verifyStorage)

it, _ = snaps.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{})
Expand Down Expand Up @@ -383,7 +383,7 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x09"), 2)
snaps.Cap(common.HexToHash("0x09"), 2, false)

it, _ = snaps.AccountIterator(common.HexToHash("0x09"), common.Hash{})
for it.Next() {
Expand Down Expand Up @@ -482,7 +482,7 @@ func TestStorageIteratorTraversalValues(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x09"), 2)
snaps.Cap(common.HexToHash("0x09"), 2, false)

it, _ = snaps.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{})
for it.Next() {
Expand Down Expand Up @@ -540,7 +540,7 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
aggregatorMemoryLimit = limit
}()
aggregatorMemoryLimit = 0 // Force pushing the bottom-most layer into disk
snaps.Cap(common.HexToHash("0x80"), 2)
snaps.Cap(common.HexToHash("0x80"), 2, false)

verifyIterator(t, 200, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)

Expand Down Expand Up @@ -579,7 +579,7 @@ func TestAccountIteratorFlattening(t *testing.T) {
it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{})
defer it.Release()

if err := snaps.Cap(common.HexToHash("0x04"), 1); err != nil {
if err := snaps.Cap(common.HexToHash("0x04"), 1, false); err != nil {
t.Fatalf("failed to flatten snapshot stack: %v", err)
}
//verifyIterator(t, 7, it)
Expand Down
72 changes: 57 additions & 15 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,27 @@
StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool)
}

type Config struct {
CacheSize int // Megabytes permitted to use for read caches
Recovery bool // Indicator that the snapshots is in the recovery mode
ReBuild bool // Indicator that the snapshots generation is disallowed
AsyncBuild bool // The snapshot generation is allowed to be constructed asynchronously
AllowForceUpdate bool // Enable to force root snapshots based on the configured commits threshold
CommitThreshold int // Threshold of commits to force a root snapshot update
}

// sanitize checks the provided user configurations and changes anything that's
// unreasonable or unworkable.
func (c *Config) sanitize() Config {
conf := *c

if conf.CommitThreshold == 0 {
log.Warn("Sanitizing commit threshold", "provided", conf.CommitThreshold, "updated", defaultCommitThreshold)
conf.CommitThreshold = defaultCommitThreshold
}
return conf
}

// Tree is an Ethereum state snapshot tree. It consists of one persistent base
// layer backed by a key-value store, on top of which arbitrarily many in-memory
// diff layers are topped. The memory diffs can form a tree with branching, but
Expand All @@ -158,11 +179,13 @@
// storage data to avoid expensive multi-level trie lookups; and to allow sorted,
// cheap iteration of the account/storage tries for sync aid.
type Tree struct {
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
triedb *trie.Database // In-memory cache to access the trie through
cache int // Megabytes permitted to use for read caches
layers map[common.Hash]snapshot // Collection of all known layers
lock sync.RWMutex
config Config
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
triedb *trie.Database // In-memory cache to access the trie through
cache int // Megabytes permitted to use for read caches
layers map[common.Hash]snapshot // Collection of all known layers
commitCounter int // Counter for number of commits
lock sync.RWMutex
}

// New attempts to load an already existing snapshot from a persistent key-value
Expand All @@ -174,25 +197,27 @@
// store, on a background thread. If the memory layers from the journal is not
// continuous with disk layer or the journal is missing, all diffs will be discarded
// iff it's in "recovery" mode, otherwise rebuild is mandatory.
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool, rebuild bool, recovery bool) (*Tree, error) {
func New(config Config, diskdb ethdb.KeyValueStore, triedb *trie.Database, root common.Hash) (*Tree, error) {

Check failure on line 200 in core/state/snapshot/snapshot.go

View workflow job for this annotation

GitHub Actions / Code linters

unnecessary leading newline (whitespace)

// Create a new, empty snapshot tree
snap := &Tree{
config: config.sanitize(),
diskdb: diskdb,
triedb: triedb,
cache: cache,
cache: config.CacheSize,
layers: make(map[common.Hash]snapshot),
}
if !async {
if !config.AsyncBuild {
defer snap.waitBuild()
}
// Attempt to load a previously persisted snapshot and rebuild one if failed
head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
head, disabled, err := loadSnapshot(diskdb, triedb, config.CacheSize, root, config.Recovery)
if disabled {
log.Warn("Snapshot maintenance disabled (syncing)")
return snap, nil
}
if err != nil {
if rebuild {
if config.ReBuild {
log.Warn("Failed to load snapshot, regenerating", "err", err)
snap.Rebuild(root)
return snap, nil
Expand Down Expand Up @@ -355,7 +380,7 @@
// which may or may not overflow and cascade to disk. Since this last layer's
// survival is only known *after* capping, we need to omit it from the count if
// we want to ensure that *at least* the requested number of diff layers remain.
func (t *Tree) Cap(root common.Hash, layers int) error {
func (t *Tree) Cap(root common.Hash, layers int, force bool) error {
// Retrieve the head snapshot to cap from
snap := t.Snapshot(root)
if snap == nil {
Expand Down Expand Up @@ -389,7 +414,7 @@
t.layers = map[common.Hash]snapshot{base.root: base}
return nil
}
persisted := t.cap(diff, layers)
persisted := t.cap(diff, layers, force)

// Remove any layer that is stale or links into a stale layer
children := make(map[common.Hash][]common.Hash)
Expand Down Expand Up @@ -439,7 +464,7 @@
// which may or may not overflow and cascade to disk. Since this last layer's
// survival is only known *after* capping, we need to omit it from the count if
// we want to ensure that *at least* the requested number of diff layers remain.
func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
func (t *Tree) cap(diff *diffLayer, layers int, force bool) *diskLayer {
// Dive until we run out of layers or reach the persistent database
for i := 0; i < layers-1; i++ {
// If we still have diff layers below, continue down
Expand All @@ -466,10 +491,11 @@
defer diff.lock.Unlock()

diff.parent = flattened
if flattened.memory < aggregatorMemoryLimit {
log.Debug("Validating snapRoot update", "limit", aggregatorMemoryLimit, "currentMemory", flattened.memory, "commitThreshold", t.config.CommitThreshold, "forceSnapshot", force)
if (flattened.memory < aggregatorMemoryLimit) && !force {
// Accumulator layer is smaller than the limit, so we can abort, unless
// there's a snapshot being generated currently. In that case, the trie
// will move fron underneath the generator so we **must** merge all the
// will move from underneath the generator so we **must** merge all the
// partial data down into the snapshot and restart the generation.
if flattened.parent.(*diskLayer).genAbort == nil {
return nil
Expand Down Expand Up @@ -827,3 +853,19 @@

return t.diskRoot()
}

// Checks the config to compare if count of commits is above threshold
func (t *Tree) CompareThreshold() bool {
if !t.config.AllowForceUpdate {
return false
}
log.Debug("Snapshot Commit counters", "counter", t.commitCounter, "threshold", t.config.CommitThreshold)
if t.commitCounter > t.config.CommitThreshold {
t.commitCounter = 0
return true
}

t.commitCounter++

return false
}
Loading
Loading