Skip to content

Commit

Permalink
eth: support bubbling up bad blocks from sync to the engine API (ethe…
Browse files Browse the repository at this point in the history
…reum#25190)

* eth: support bubbling up bad blocks from sync to the engine API

* eth/catalyst: fix typo

Co-authored-by: Marius van der Wijden <[email protected]>

* eth/catalyst: fix typo

Co-authored-by: Marius van der Wijden <[email protected]>

* Update eth/catalyst/api.go

* eth/catalyst: when forgetting bad hashes, also forget descendants

* eth/catalyst: minor bad block tweaks for resilience

Co-authored-by: Marius van der Wijden <[email protected]>
Co-authored-by: Martin Holst Swende <[email protected]>
  • Loading branch information
3 people authored and jagdeep sidhu committed Aug 6, 2022
1 parent 0f9fff3 commit f5f544f
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 17 deletions.
2 changes: 1 addition & 1 deletion core/beacon/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type payloadAttributesMarshaling struct {

//go:generate go run github.com/fjl/gencodec -type ExecutableDataV1 -field-override executableDataMarshaling -out gen_ed.go

// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/src/engine/specification.md
// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/tree/main/src/engine/specification.md
type ExecutableDataV1 struct {
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
FeeRecipient common.Address `json:"feeRecipient" gencodec:"required"`
Expand Down
139 changes: 127 additions & 12 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,47 @@ func Register(stack *node.Node, backend *eth.Ethereum) error {
return nil
}

const (
// invalidBlockHitEviction is the number of times an invalid block can be
// referenced in forkchoice update or new payload before it is attempted
// to be reprocessed again.
invalidBlockHitEviction = 128

// invalidTipsetsCap is the max number of recent block hashes tracked that
// have lead to some bad ancestor block. It's just an OOM protection.
invalidTipsetsCap = 512
)

type ConsensusAPI struct {
eth *eth.Ethereum
eth *eth.Ethereum

remoteBlocks *headerQueue // Cache of remote payloads received
localBlocks *payloadQueue // Cache of local payloads generated
// Lock for the forkChoiceUpdated method
forkChoiceLock sync.Mutex

// The forkchoice update and new payload method require us to return the
// latest valid hash in an invalid chain. To support that return, we need
// to track historical bad blocks as well as bad tipsets in case a chain
// is constantly built on it.
//
// There are a few important caveats in this mechanism:
// - The bad block tracking is ephemeral, in-memory only. We must never
// persist any bad block information to disk as a bug in Geth could end
// up blocking a valid chain, even if a later Geth update would accept
// it.
// - Bad blocks will get forgotten after a certain threshold of import
// attempts and will be retried. The rationale is that if the network
// really-really-really tries to feed us a block, we should give it a
// new chance, perhaps us being racey instead of the block being legit
// bad (this happened in Geth at a point with import vs. pending race).
// - Tracking all the blocks built on top of the bad one could be a bit
// problematic, so we will only track the head chain segment of a bad
// chain to allow discarding progressing bad chains and side chains,
// without tracking too much bad data.
invalidBlocksHits map[common.Hash]int // Emhemeral cache to track invalid blocks and their hit count
invalidTipsets map[common.Hash]*types.Header // Ephemeral cache to track invalid tipsets and their bad ancestor
invalidLock sync.Mutex // Protects the invalid maps from concurrent access

forkChoiceLock sync.Mutex // Lock for the forkChoiceUpdated method
}

// NewConsensusAPI creates a new consensus api for the given backend.
Expand All @@ -64,11 +99,16 @@ func NewConsensusAPI(eth *eth.Ethereum) *ConsensusAPI {
if eth.BlockChain().Config().TerminalTotalDifficulty == nil {
log.Warn("Engine API started but chain not configured for merge yet")
}
return &ConsensusAPI{
eth: eth,
remoteBlocks: newHeaderQueue(),
localBlocks: newPayloadQueue(),
api := &ConsensusAPI{
eth: eth,
remoteBlocks: newHeaderQueue(),
localBlocks: newPayloadQueue(),
invalidBlocksHits: make(map[common.Hash]int),
invalidTipsets: make(map[common.Hash]*types.Header),
}
eth.Downloader().SetBadBlockCallback(api.setInvalidAncestor)

return api
}

// ForkchoiceUpdatedV1 has several responsibilities:
Expand Down Expand Up @@ -96,6 +136,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
// reason.
block := api.eth.BlockChain().GetBlockByHash(update.HeadBlockHash)
if block == nil {
// If this block was previously invalidated, keep rejecting it here too
if res := api.checkInvalidAncestor(update.HeadBlockHash, update.HeadBlockHash); res != nil {
return beacon.ForkChoiceResponse{PayloadStatus: *res, PayloadID: nil}, nil
}
// If the head hash is unknown (was not given to us in a newPayload request),
// we cannot resolve the header, so not much to do. This could be extended in
// the future to resolve from the `eth` network, but it's an unexpected case
Expand Down Expand Up @@ -266,6 +310,10 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
hash := block.Hash()
return beacon.PayloadStatusV1{Status: beacon.VALID, LatestValidHash: &hash}, nil
}
// If this block was rejected previously, keep rejecting it
if res := api.checkInvalidAncestor(block.Hash(), block.Hash()); res != nil {
return *res, nil
}
// If the parent is missing, we - in theory - could trigger a sync, but that
// would also entail a reorg. That is problematic if multiple sibling blocks
// are being fed to us, and even more so, if some semi-distant uncle shortens
Expand Down Expand Up @@ -293,7 +341,7 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
}
if block.Time() <= parent.Time() {
log.Warn("Invalid timestamp", "parent", block.Time(), "block", block.Time())
return api.invalid(errors.New("invalid timestamp"), parent), nil
return api.invalid(errors.New("invalid timestamp"), parent.Header()), nil
}
// Another cornercase: if the node is in snap sync mode, but the CL client
// tries to make it import a block. That should be denied as pushing something
Expand All @@ -310,7 +358,13 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number)
if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
log.Warn("NewPayloadV1: inserting block failed", "error", err)
return api.invalid(err, parent), nil

api.invalidLock.Lock()
api.invalidBlocksHits[block.Hash()] = 1
api.invalidTipsets[block.Hash()] = block.Header()
api.invalidLock.Unlock()

return api.invalid(err, parent.Header()), nil
}
// We've accepted a valid payload from the beacon client. Mark the local
// chain transitions to notify other subsystems (e.g. downloader) of the
Expand Down Expand Up @@ -339,8 +393,13 @@ func computePayloadId(headBlockHash common.Hash, params *beacon.PayloadAttribute
// delayPayloadImport stashes the given block away for import at a later time,
// either via a forkchoice update or a sync extension. This method is meant to
// be called by the newpayload command when the block seems to be ok, but some
// prerequisite prevents it from being processed (e.g. no parent, or nap sync).
// prerequisite prevents it from being processed (e.g. no parent, or snap sync).
func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (beacon.PayloadStatusV1, error) {
// Sanity check that this block's parent is not on a previously invalidated
// chain. If it is, mark the block as invalid too.
if res := api.checkInvalidAncestor(block.ParentHash(), block.Hash()); res != nil {
return *res, nil
}
// Stash the block away for a potential forced forkchoice update to it
// at a later time.
api.remoteBlocks.put(block.Hash(), block.Header())
Expand All @@ -360,14 +419,70 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (beacon.PayloadS
return beacon.PayloadStatusV1{Status: beacon.ACCEPTED}, nil
}

// setInvalidAncestor is a callback for the downloader to notify us if a bad block
// is encountered during the async sync.
func (api *ConsensusAPI) setInvalidAncestor(invalid *types.Header, origin *types.Header) {
api.invalidLock.Lock()
defer api.invalidLock.Unlock()

api.invalidTipsets[origin.Hash()] = invalid
api.invalidBlocksHits[invalid.Hash()]++
}

// checkInvalidAncestor checks whether the specified chain end links to a known
// bad ancestor. If yes, it constructs the payload failure response to return.
func (api *ConsensusAPI) checkInvalidAncestor(check common.Hash, head common.Hash) *beacon.PayloadStatusV1 {
api.invalidLock.Lock()
defer api.invalidLock.Unlock()

// If the hash to check is unknown, return valid
invalid, ok := api.invalidTipsets[check]
if !ok {
return nil
}
// If the bad hash was hit too many times, evict it and try to reprocess in
// the hopes that we have a data race that we can exit out of.
badHash := invalid.Hash()

api.invalidBlocksHits[badHash]++
if api.invalidBlocksHits[badHash] >= invalidBlockHitEviction {
log.Warn("Too many bad block import attempt, trying", "number", invalid.Number, "hash", badHash)
delete(api.invalidBlocksHits, badHash)

for descendant, badHeader := range api.invalidTipsets {
if badHeader.Hash() == badHash {
delete(api.invalidTipsets, descendant)
}
}
return nil
}
// Not too many failures yet, mark the head of the invalid chain as invalid
if check != head {
log.Warn("Marked new chain head as invalid", "hash", head, "badnumber", invalid.Number, "badhash", badHash)
for len(api.invalidTipsets) >= invalidTipsetsCap {
for key := range api.invalidTipsets {
delete(api.invalidTipsets, key)
break
}
}
api.invalidTipsets[head] = invalid
}
failure := "links to previously rejected block"
return &beacon.PayloadStatusV1{
Status: beacon.INVALID,
LatestValidHash: &invalid.ParentHash,
ValidationError: &failure,
}
}

// invalid returns a response "INVALID" with the latest valid hash supplied by latest or to the current head
// if no latestValid block was provided.
func (api *ConsensusAPI) invalid(err error, latestValid *types.Block) beacon.PayloadStatusV1 {
func (api *ConsensusAPI) invalid(err error, latestValid *types.Header) beacon.PayloadStatusV1 {
currentHash := api.eth.BlockChain().CurrentBlock().Hash()
if latestValid != nil {
// Set latest valid hash to 0x0 if parent is PoW block
currentHash = common.Hash{}
if latestValid.Difficulty().BitLen() == 0 {
if latestValid.Difficulty.BitLen() == 0 {
// Otherwise set latest valid hash to parent hash
currentHash = latestValid.Hash()
}
Expand Down
6 changes: 3 additions & 3 deletions eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,16 +773,16 @@ func TestTrickRemoteBlockCache(t *testing.T) {
if err != nil {
panic(err)
}
if status.Status == beacon.INVALID {
panic("success")
if status.Status == beacon.VALID {
t.Error("invalid status: VALID on an invalid chain")
}
// Now reorg to the head of the invalid chain
resp, err := apiB.ForkchoiceUpdatedV1(beacon.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash, SafeBlockHash: payload.BlockHash, FinalizedBlockHash: payload.ParentHash}, nil)
if err != nil {
t.Fatal(err)
}
if resp.PayloadStatus.Status == beacon.VALID {
t.Errorf("invalid status: expected INVALID got: %v", resp.PayloadStatus.Status)
t.Error("invalid status: VALID on an invalid chain")
}
time.Sleep(100 * time.Millisecond)
}
Expand Down
7 changes: 7 additions & 0 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func (b *beaconBackfiller) setMode(mode SyncMode) {
b.resume()
}

// SetBadBlockCallback sets the callback to run when a bad block is hit by the
// block processor. This method is not thread safe and should be set only once
// on startup before system events are fired.
func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
d.badBlock = onBadBlock
}

// BeaconSync is the post-merge version of the chain synchronization, where the
// chain is not downloaded from genesis onward, rather from trusted head announces
// backwards.
Expand Down
17 changes: 16 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)

// badBlockFn is a callback for the async beacon sync to notify the caller that
// the origin header requested to sync to, produced a chain with a bad block.
type badBlockFn func(invalid *types.Header, origin *types.Header)

// headerTask is a set of downloaded headers to queue along with their precomputed
// hashes to avoid constant rehashing.
type headerTask struct {
Expand Down Expand Up @@ -113,6 +117,7 @@ type Downloader struct {

// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving
badBlock badBlockFn // Reports a block as rejected by the chain

// Status
// SYSCOIN
Expand Down Expand Up @@ -1538,7 +1543,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
return errCancelContentProcessing
default:
}
// Retrieve the a batch of results to import
// Retrieve a batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
Expand All @@ -1554,6 +1559,16 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
if index, err := d.blockchain.InsertChain(blocks); err != nil {
if index < len(results) {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)

// In post-merge, notify the engine API of encountered bad chains
if d.badBlock != nil {
head, _, err := d.skeleton.Bounds()
if err != nil {
log.Error("Failed to retrieve beacon bounds for bad block reporting", "err", err)
} else {
d.badBlock(blocks[index].Header(), head)
}
}
} else {
// The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
// when it needs to preprocess blocks to import a sidechain.
Expand Down

0 comments on commit f5f544f

Please sign in to comment.