Skip to content

Commit

Permalink
Statediff for full node (#6)
Browse files Browse the repository at this point in the history
* Open a trie from the in-memory database

* Use a node's LeafKey as an identifier instead of the address

It was proving difficult to find look the address up from a given path
with a full node (sometimes the value wouldn't exist in the disk db).
So, instead, for now we are using the node's LeafKey with is a Keccak256
hash of the address, so if we know the address we can figure out which
LeafKey it matches up to.

* Make sure that statediff has been processed before pruning

* Use blockchain stateCache.OpenTrie for storage diffs

* Clean up log lines and remove unnecessary fields from builder

* Apply go fmt changes

* Add a sleep to the blockchain test

* Address PR comments

* Address PR comments
  • Loading branch information
elizabethengelman authored and i-norden committed Apr 19, 2019
1 parent 3c2c96a commit 3dd8767
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 97 deletions.
9 changes: 6 additions & 3 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,14 @@ func makeFullNode(ctx *cli.Context) *node.Node {
if ctx.GlobalIsSet(utils.ConstantinopleOverrideFlag.Name) {
cfg.Eth.ConstantinopleOverride = new(big.Int).SetUint64(ctx.GlobalUint64(utils.ConstantinopleOverrideFlag.Name))
}

utils.RegisterEthService(stack, &cfg.Eth)

if ctx.GlobalBool(utils.StateDiffFlag.Name) {
cfg.Eth.StateDiff = true
utils.RegisterStateDiffService(stack, ctx)
}

if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
}
Expand Down Expand Up @@ -190,9 +196,6 @@ func makeFullNode(ctx *cli.Context) *node.Node {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}

if ctx.GlobalBool(utils.StateDiffFlag.Name) {
utils.RegisterStateDiffService(stack, ctx)
}
return stack
}

Expand Down
60 changes: 44 additions & 16 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type CacheConfig struct {
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
ProcessingStateDiffs bool // Whether statediffs processing should be taken into a account before a trie is pruned
}

// BlockChain represents the canonical chain given a database with a genesis
Expand Down Expand Up @@ -156,6 +157,8 @@ type BlockChain struct {

badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.

stateDiffsProcessed map[common.Hash]int
}

// NewBlockChain returns a fully initialised block chain using information
Expand All @@ -175,23 +178,24 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
blockCache, _ := lru.New(blockCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)

stateDiffsProcessed := make(map[common.Hash]int)
bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
stateDiffsProcessed: stateDiffsProcessed,
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
Expand Down Expand Up @@ -932,6 +936,11 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
return nil
}

func (bc *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {
count := bc.stateDiffsProcessed[hash]
bc.stateDiffsProcessed[hash] = count + 1
}

// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.chainmu.Lock()
Expand Down Expand Up @@ -1016,6 +1025,16 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.triegc.Push(root, number)
break
}

if bc.cacheConfig.ProcessingStateDiffs {
if !bc.allowedRootToBeDereferenced(root.(common.Hash)) {
bc.triegc.Push(root, number)
break
} else {
delete(bc.stateDiffsProcessed, root.(common.Hash))
}
}

triedb.Dereference(root.(common.Hash))
}
}
Expand Down Expand Up @@ -1070,6 +1089,15 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
return status, nil
}

// since we need the state tries of the current block and its parent in-memory
// in order to process statediffs, we should avoid dereferencing roots until
// its statediff and its child have been processed
func (bc *BlockChain) allowedRootToBeDereferenced(root common.Hash) bool {
diffProcessedForSelfAndChildCount := 2
count := bc.stateDiffsProcessed[root]
return count >= diffProcessedForSelfAndChildCount
}

// addFutureBlock checks if the block is within the max allowed window to get
// accepted for future processing, and returns an error if the block is too far
// ahead and was not added.
Expand Down
81 changes: 81 additions & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,3 +1810,84 @@ func TestPrunedImportSide(t *testing.T) {
testSideImport(t, 1, 10)
testSideImport(t, 1, -10)
}

func TestProcessingStateDiffs(t *testing.T) {
defaultTrieCleanCache := 256
defaultTrieDirtyCache := 256
defaultTrieTimeout := 60 * time.Minute
cacheConfig := &CacheConfig{
Disabled: false,
TrieCleanLimit: defaultTrieCleanCache,
TrieDirtyLimit: defaultTrieDirtyCache,
TrieTimeLimit: defaultTrieTimeout,
ProcessingStateDiffs: true,
}
db := ethdb.NewMemDatabase()
genesis := new(Genesis).MustCommit(db)
numberOfBlocks := triesInMemory
engine := ethash.NewFaker()
blockchain, _ := NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil)
blocks := makeBlockChain(genesis, numberOfBlocks+1, engine, db, canonicalSeed)
_, err := blockchain.InsertChain(blocks)
if err != nil {
t.Fatalf("failed to create pristine chain: %v", err)
}
defer blockchain.Stop()

//when adding a root hash to the collection, it will increment the count
firstStateRoot := blocks[0].Root()
blockchain.AddToStateDiffProcessedCollection(firstStateRoot)
value, ok := blockchain.stateDiffsProcessed[firstStateRoot]
if !ok {
t.Error("state root not found in collection")
}
if value != 1 {
t.Error("state root count not correct", "want", 1, "got", value)
}

blockchain.AddToStateDiffProcessedCollection(firstStateRoot)
value, ok = blockchain.stateDiffsProcessed[firstStateRoot]
if !ok {
t.Error("state root not found in collection")
}
if value != 2 {
t.Error("state root count not correct", "want", 2, "got", value)
}

moreBlocks := makeBlockChain(blocks[len(blocks)-1], 1, engine, db, canonicalSeed)
_, err = blockchain.InsertChain(moreBlocks)

//a root hash can be dereferenced when it's state diff and it's child's state diff have been processed
//(i.e. it has a count of 2 in stateDiffsProcessed)
nodes := blockchain.stateCache.TrieDB().Nodes()
if containsRootHash(nodes, firstStateRoot) {
t.Errorf("stateRoot %s in nodes, want: %t, got: %t", firstStateRoot.Hex(), false, true)
}

//a root hash should still be in the in-mem db if it's child's state diff hasn't yet been processed
//(i.e. it has a count of 1 stateDiffsProcessed)
secondStateRoot := blocks[1].Root()
blockchain.AddToStateDiffProcessedCollection(secondStateRoot)
if !containsRootHash(nodes, secondStateRoot) {
t.Errorf("stateRoot %s in nodes, want: %t, got: %t", secondStateRoot.Hex(), true, false)
}

//the stateDiffsProcessed collection is cleaned up once a hash has been dereferenced
_, ok = blockchain.stateDiffsProcessed[firstStateRoot]
if ok {
t.Errorf("stateRoot %s in stateDiffsProcessed collection, want: %t, got: %t",
firstStateRoot.Hex(),
false,
ok,
)
}
}

func containsRootHash(collection []common.Hash, hash common.Hash) bool {
for _, n := range collection {
if n == hash {
return true
}
}
return false
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
ProcessingStateDiffs: config.StateDiff,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve)
Expand Down
4 changes: 4 additions & 0 deletions eth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var DefaultConfig = Config{
Blocks: 20,
Percentile: 60,
},

StateDiff: false,
}

func init() {
Expand Down Expand Up @@ -154,6 +156,8 @@ type Config struct {

// RPCGasCap is the global gas cap for eth-call variants.
RPCGasCap *big.Int `toml:",omitempty"`

StateDiff bool
}

type configMarshaling struct {
Expand Down
66 changes: 33 additions & 33 deletions statediff/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package builder
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -35,25 +36,27 @@ type Builder interface {

type builder struct {
chainDB ethdb.Database
trieDB *trie.Database
cachedTrie *trie.Trie
blockChain *core.BlockChain
}

func NewBuilder(db ethdb.Database) *builder {
type AccountsMap map[common.Hash]*state.Account

func NewBuilder(db ethdb.Database, blockChain *core.BlockChain) *builder {
return &builder{
chainDB: db,
trieDB: trie.NewDatabase(db),
chainDB: db,
blockChain: blockChain,
}
}

func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, blockNumber int64, blockHash common.Hash) (*StateDiff, error) {
// Generate tries for old and new states
oldTrie, err := trie.New(oldStateRoot, sdb.trieDB)
stateCache := sdb.blockChain.StateCache()
oldTrie, err := stateCache.OpenTrie(oldStateRoot)
if err != nil {
log.Error("Error creating trie for oldStateRoot", "error", err)
return nil, err
}
newTrie, err := trie.New(newStateRoot, sdb.trieDB)
newTrie, err := stateCache.OpenTrie(newStateRoot)
if err != nil {
log.Error("Error creating trie for newStateRoot", "error", err)
return nil, err
Expand Down Expand Up @@ -108,33 +111,27 @@ func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, block
}, nil
}

func (sdb *builder) collectDiffNodes(a, b trie.NodeIterator) (map[common.Address]*state.Account, error) {
var diffAccounts = make(map[common.Address]*state.Account)
func (sdb *builder) collectDiffNodes(a, b trie.NodeIterator) (AccountsMap, error) {
var diffAccounts = make(AccountsMap)
it, _ := trie.NewDifferenceIterator(a, b)

for {
log.Debug("Current Path and Hash", "path", pathToStr(it), "hashold", it.Hash())
if it.Leaf() {

// lookup address
path := make([]byte, len(it.Path())-1)
copy(path, it.Path())
addr, err := sdb.addressByPath(path)
if err != nil {
log.Error("Error looking up address via path", "path", path, "error", err)
return nil, err
}
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
leafKeyHash := common.BytesToHash(leafKey)

// lookup account state
var account state.Account
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
log.Error("Error looking up account via address", "address", addr, "error", err)
log.Error("Error looking up account via address", "address", leafKeyHash, "error", err)
return nil, err
}

// record account to diffs (creation if we are looking at new - old; deletion if old - new)
log.Debug("Account lookup successful", "address", addr, "account", account)
diffAccounts[*addr] = &account
log.Debug("Account lookup successful", "address", leafKeyHash, "account", account)
diffAccounts[leafKeyHash] = &account
}
cont := it.Next(true)
if !cont {
Expand All @@ -145,8 +142,8 @@ func (sdb *builder) collectDiffNodes(a, b trie.NodeIterator) (map[common.Address
return diffAccounts, nil
}

func (sdb *builder) buildDiffEventual(accounts map[common.Address]*state.Account) (map[common.Address]AccountDiff, error) {
accountDiffs := make(map[common.Address]AccountDiff)
func (sdb *builder) buildDiffEventual(accounts AccountsMap) (AccountDiffsMap, error) {
accountDiffs := make(AccountDiffsMap)
for addr, val := range accounts {
sr := val.Root
storageDiffs, err := sdb.buildStorageDiffsEventual(sr)
Expand All @@ -172,11 +169,11 @@ func (sdb *builder) buildDiffEventual(accounts map[common.Address]*state.Account
return accountDiffs, nil
}

func (sdb *builder) buildDiffIncremental(creations map[common.Address]*state.Account, deletions map[common.Address]*state.Account, updatedKeys []string) (map[common.Address]AccountDiff, error) {
updatedAccounts := make(map[common.Address]AccountDiff)
func (sdb *builder) buildDiffIncremental(creations AccountsMap, deletions AccountsMap, updatedKeys []string) (AccountDiffsMap, error) {
updatedAccounts := make(AccountDiffsMap)
for _, val := range updatedKeys {
createdAcc := creations[common.HexToAddress(val)]
deletedAcc := deletions[common.HexToAddress(val)]
createdAcc := creations[common.HexToHash(val)]
deletedAcc := deletions[common.HexToHash(val)]
oldSR := deletedAcc.Root
newSR := createdAcc.Root
if storageDiffs, err := sdb.buildStorageDiffsIncremental(oldSR, newSR); err != nil {
Expand All @@ -190,23 +187,24 @@ func (sdb *builder) buildDiffIncremental(creations map[common.Address]*state.Acc
nHexRoot := createdAcc.Root.Hex()
contractRoot := DiffString{Value: &nHexRoot}

updatedAccounts[common.HexToAddress(val)] = AccountDiff{
updatedAccounts[common.HexToHash(val)] = AccountDiff{
Nonce: nonce,
Balance: balance,
CodeHash: codeHash,
ContractRoot: contractRoot,
Storage: storageDiffs,
}
delete(creations, common.HexToAddress(val))
delete(deletions, common.HexToAddress(val))
delete(creations, common.HexToHash(val))
delete(deletions, common.HexToHash(val))
}
}
return updatedAccounts, nil
}

func (sdb *builder) buildStorageDiffsEventual(sr common.Hash) (map[string]DiffStorage, error) {
log.Debug("Storage Root For Eventual Diff", "root", sr.Hex())
sTrie, err := trie.New(sr, sdb.trieDB)
stateCache := sdb.blockChain.StateCache()
sTrie, err := stateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build storage diff eventual", "error", err)
return nil, err
Expand All @@ -218,11 +216,13 @@ func (sdb *builder) buildStorageDiffsEventual(sr common.Hash) (map[string]DiffSt

func (sdb *builder) buildStorageDiffsIncremental(oldSR common.Hash, newSR common.Hash) (map[string]DiffStorage, error) {
log.Debug("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex())
oldTrie, err := trie.New(oldSR, sdb.trieDB)
stateCache := sdb.blockChain.StateCache()

oldTrie, err := stateCache.OpenTrie(oldSR)
if err != nil {
return nil, err
}
newTrie, err := trie.New(newSR, sdb.trieDB)
newTrie, err := stateCache.OpenTrie(newSR)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 3dd8767

Please sign in to comment.