Skip to content

Commit

Permalink
core: write chain data in atomic way
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinod Damle committed Jan 22, 2021
1 parent a483a6d commit 8d128e8
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 96 deletions.
146 changes: 89 additions & 57 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())

// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of SetHead is from high
// to low, so it's safe the update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
Expand All @@ -443,6 +448,11 @@ func (bc *BlockChain) SetHead(head uint64) error {
newHeadFastBlock = bc.genesisBlock
}
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())

// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of SetHead is from high
// to low, so it's safe the update in-memory markers directly.
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
Expand Down Expand Up @@ -578,21 +588,23 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
defer bc.chainmu.Unlock()

// Prepare the genesis block and reinitialise the chain
if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
log.Crit("Failed to write genesis block TD", "err", err)
batch := bc.db.NewBatch()

rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
rawdb.WriteBlock(batch, genesis)
if err := batch.Write(); err != nil {
log.Crit("Failed to write genesis block", "err", err)
}
rawdb.WriteBlock(bc.db, genesis)
bc.writeHeadBlock(genesis)

// Last update all in-memory chain markers
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))

bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))

return nil
}

Expand Down Expand Up @@ -650,31 +662,39 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return nil
}

// insert injects a new head block into the current block chain. This method
// writeHeadBlock injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block) {
func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// If the block is on a side chain or an unknown one, force other heads onto it too
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()

// Add the block to the canonical chain number scheme and mark as the head
rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(bc.db, block.Hash())

bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
batch := bc.db.NewBatch()
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
rawdb.WriteTxLookupEntries(batch, block)
rawdb.WriteHeadBlockHash(batch, block.Hash())

// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())

bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
rawdb.WriteHeadHeaderHash(batch, block.Hash())
rawdb.WriteHeadFastBlockHash(batch, block.Hash())
}
// Flush the whole batch into the disk, exit the node if failed
if err := batch.Write(); err != nil {
log.Crit("Failed to update chain indexes and markers", "err", err)
}
// Update all in-memory chain markers in the last step
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
}

// Genesis retrieves the chain's genesis block.
Expand Down Expand Up @@ -920,26 +940,36 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

batch := bc.db.NewBatch()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]

// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of rollback is from high
// to low, so it's safe the update in-memory markers directly.
currentHeader := bc.hc.CurrentHeader()
if currentHeader.Hash() == hash {
bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)
rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash)
bc.hc.SetCurrentHeader(newHeadHeader)
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash())
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash())
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
}
}
if err := batch.Write(); err != nil {
log.Crit("Failed to rollback chain markers", "err", err)
}
// Truncate ancient data which exceeds the current header.
//
// Notably, it can happen that system crashes without truncating the ancient data
Expand Down Expand Up @@ -1102,7 +1132,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
// Don't collect too much in-memory, write it out every 100K blocks
if len(deleted) > 100000 {

// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
return 0, err
Expand Down Expand Up @@ -1211,15 +1240,21 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
rawdb.WriteTxLookupEntries(batch, block)

stats.processed++
// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
}
size += batch.ValueSize()
batch.Reset()
}
stats.processed++
}
// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() > 0 {
size += batch.ValueSize()
if err := batch.Write(); err != nil {
Expand Down Expand Up @@ -1270,11 +1305,12 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
bc.wg.Add(1)
defer bc.wg.Done()

if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
return err
}
rawdb.WriteBlock(bc.db, block)

batch := bc.db.NewBatch()
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
rawdb.WriteBlock(batch, block)
if err := batch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
return nil
}

Expand All @@ -1290,11 +1326,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
return err
}
}
// Write the positional metadata for transaction/receipt lookups.
// Preimages here is empty, ignore it.
rawdb.WriteTxLookupEntries(bc.db, block)

bc.insert(block)
bc.writeHeadBlock(block)
return nil
}

Expand Down Expand Up @@ -1368,11 +1400,19 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
externTd := new(big.Int).Add(block.Difficulty(), ptd)

// Irrelevant of the canonical status, write the block itself to the database
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
return NonStatTy, err
}
rawdb.WriteBlock(bc.db, block)

//
// Note all the components of block(td, hash->number map, header, body, receipts)
// should be written atomically. BlockBatch is used for containing all components.
blockBatch := bc.db.NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
rawdb.WritePreimages(blockBatch, state.Preimages())
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}

// Commit all cached state changes into underlying memory database.
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))

if err != nil {
Expand Down Expand Up @@ -1433,11 +1473,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
}
}

// Write other block data using a batch.
batch := bc.db.NewBatch()
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)

// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
Expand All @@ -1463,10 +1498,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, err
}
}
// Write the positional metadata for transaction/receipt lookups and preimages
rawdb.WriteTxLookupEntries(batch, block)
rawdb.WritePreimages(batch, state.Preimages())

status = CanonStatTy
} else {
status = SideStatTy
Expand All @@ -1477,7 +1508,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.

// Set new head.
if status == CanonStatTy {
bc.insert(block)
bc.writeHeadBlock(block)
}
bc.futureBlocks.Remove(block.Hash())
return status, nil
Expand Down Expand Up @@ -2095,20 +2126,19 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// taking care of the proper incremental order.
for i := len(newChain) - 1; i >= 1; i-- {
// Insert the block in the canonical way, re-writing history
bc.insert(newChain[i])
bc.writeHeadBlock(newChain[i])

// Collect reborn logs due to chain reorg
collectLogs(newChain[i].Hash(), false)

// Write lookup entries for hash based transaction/receipt searches
rawdb.WriteTxLookupEntries(bc.db, newChain[i])
// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
}
// When transactions get deleted from the database, the receipts that were
// created in the fork must also be deleted
batch := bc.db.NewBatch()
// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
indexesBatch := bc.db.NewBatch()
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(batch, tx.Hash())
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
}
// Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64()
Expand All @@ -2117,9 +2147,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(batch, i)
rawdb.DeleteCanonicalHash(indexesBatch, i)
}
if err := indexesBatch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}
batch.Write()

// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
Expand Down
Loading

0 comments on commit 8d128e8

Please sign in to comment.