Skip to content

Commit

Permalink
Borevent snapshot validation (#9436)
Browse files Browse the repository at this point in the history
This adds integrity checking for borevents in the following places

1. Stage Bor Heimdall - check that the event occurs withing the expected
time window
2. Dump Events to snapshots - check that the event ids are continuous in
and between blocks (will add a time window tests)
3. Index event snapshots - check that the event ids are continuous in
and between blocks

It also adds an external integrity checker which runs on snapshots
checking for continuity and timeliness of events. This can be called
using the following command:
`erigon snapshots integrity --datadir=~/snapshots/bor-mainnet-patch-1
--from=45500000`
(--to specifies an end block)

This also now fixes the long running issue with bor events causing
unexpected fails in executions:

the problem event validation uncovered was a follows:

The **kv.BorEventNums** mapping table currently keeps the mapping first
event id->block. The code which produces bor-event snapshots to
determine which events to put into the snapshot.

however if no additional blocks have events by the time the block is
stored in the snapshot, the snapshot creation code does not know which
events to include - so drops them.

This causes problems in two places:

* RPC queries & re-execution from snapshots can't find these dropped
events
* Depending on purge timing these events may erroneously get inserted
into future blocks
  
 The code change in this PR fixes that bug.
 
It has been tested by running:

```
erigon --datadir=~/chains/e3/amoy --chain=amoy --bor.heimdall=https://heimdall-api-amoy.polygon.technology --db.writemap=false --txpool.disable --no-downloader --bor.milestone=false
```

with validation in place and the confimed by running the following:

```
erigon snapshots rm-all-state-snapshots --datadir=~/chains/e3/amoy
rm ~/chains/e3/amoy/chaindata/
erigon --datadir=~/chains/e3/amoy --chain=amoy --bor-heimdall=https://heimdall-api-amoy.polygon.technology --db.writemap=false --no-downloader --bor.milestone=false
```
To recreate the chain from snapshots.

It has also been checked with:

```
erigon snapshots integrity --datadir=~/chains/e3/amoy --check=NoBorEventGaps --failFast=true"
```

---------

Co-authored-by: alex.sharov <[email protected]>
Co-authored-by: Mark Holt <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2024
1 parent 48544c9 commit e4eb9fc
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 103 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
build/_vendor/pkg
/*.a
docs/readthedocs/build
/.VSCodeCounter/

/.VSCodeCounter
#*
.#*
*#
Expand Down
3 changes: 2 additions & 1 deletion eth/integrity/integrity_action_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ const (
BlocksTxnID Check = "BlocksTxnID"
InvertedIndex Check = "InvertedIndex"
HistoryNoSystemTxs Check = "HistoryNoSystemTxs"
NoBorEventGaps Check = "NoBorEventGaps"
)

var AllChecks = []Check{
Blocks, BlocksTxnID, InvertedIndex, HistoryNoSystemTxs,
Blocks, BlocksTxnID, InvertedIndex, HistoryNoSystemTxs, NoBorEventGaps,
}
130 changes: 130 additions & 0 deletions eth/integrity/no_gaps_bor_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package integrity

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/erigontech/erigon-lib/chain"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/core"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/turbo/services"
"github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"
)

func NoGapsInBorEvents(ctx context.Context, db kv.RoDB, blockReader services.FullBlockReader, from, to uint64, failFast bool) (err error) {
defer func() {
log.Info("[integrity] NoGapsInBorEvents: done", "err", err)
}()

var cc *chain.Config

if db == nil {
genesis := core.BorMainnetGenesisBlock()
cc = genesis.Config
} else {
err = db.View(ctx, func(tx kv.Tx) error {
cc, err = chain.GetConfig(tx, nil)
if err != nil {
return err
}
return nil
})

if err != nil {
err = fmt.Errorf("cant read chain config from db: %w", err)
return err
}
}

if cc.BorJSON == nil {
return err
}

config := &borcfg.BorConfig{}

if err := json.Unmarshal(cc.BorJSON, config); err != nil {
err = fmt.Errorf("invalid chain config 'bor' JSON: %w", err)
return err
}

logEvery := time.NewTicker(10 * time.Second)
defer logEvery.Stop()

snapshots := blockReader.BorSnapshots().(*freezeblocks.BorRoSnapshots)

var prevEventId uint64
var maxBlockNum uint64

if to > 0 {
maxBlockNum = to
} else {
maxBlockNum = snapshots.SegmentsMax()
}

view := snapshots.View()
defer view.Close()

for _, eventSegment := range view.Events() {

if from > 0 && eventSegment.From() < from {
continue
}

if to > 0 && eventSegment.From() > to {
break
}

prevEventId, err = freezeblocks.ValidateBorEvents(ctx, config, db, blockReader, eventSegment, prevEventId, maxBlockNum, failFast, logEvery)

if err != nil && failFast {
return err
}
}

if db != nil {
err = db.View(ctx, func(tx kv.Tx) error {
if false {
lastEventId, _, err := blockReader.LastEventId(ctx, tx)

if err != nil {
return err
}

borHeimdallProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall)

if err != nil {
return err
}

bodyProgress, err := stages.GetStageProgress(tx, stages.Bodies)

if err != nil {
return err
}

log.Info("[integrity] LAST Event", "event", lastEventId, "bor-progress", borHeimdallProgress, "body-progress", bodyProgress)

if bodyProgress > borHeimdallProgress {
for blockNum := maxBlockNum + 1; blockNum <= bodyProgress; blockNum++ {

}
}
}

return nil
})

if err != nil {
return err
}
}

log.Info("[integrity] done checking bor events", "event", prevEventId)

return nil
}
6 changes: 3 additions & 3 deletions eth/integrity/no_gaps_in_canonical_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,17 @@ func NoGapsInCanonicalHeaders(tx kv.Tx, ctx context.Context, br services.FullBlo
panic(err)
}
if hash == (common.Hash{}) {
err = fmt.Errorf("canonical marker not found: %d\n", i)
err = fmt.Errorf("canonical marker not found: %d", i)
panic(err)
}
header := rawdb.ReadHeader(tx, hash, i)
if header == nil {
err = fmt.Errorf("header not found: %d\n", i)
err = fmt.Errorf("header not found: %d", i)
panic(err)
}
body, _, _ := rawdb.ReadBody(tx, hash, i)
if body == nil {
err = fmt.Errorf("header not found: %d\n", i)
err = fmt.Errorf("header not found: %d", i)
panic(err)
}

Expand Down
9 changes: 7 additions & 2 deletions eth/integrity/snap_blocks_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ import (
"github.com/erigontech/erigon/turbo/services"
)

func SnapBlocksRead(db kv.RoDB, blockReader services.FullBlockReader, ctx context.Context, failFast bool) error {
func SnapBlocksRead(ctx context.Context, db kv.RoDB, blockReader services.FullBlockReader, from, to uint64, failFast bool) error {
defer log.Info("[integrity] SnapBlocksRead: done")
logEvery := time.NewTicker(10 * time.Second)
defer logEvery.Stop()

maxBlockNum := blockReader.Snapshots().SegmentsMax()
for i := uint64(0); i < maxBlockNum; i += 10_000 {

if to != 0 && maxBlockNum > to {
maxBlockNum = 2
}

for i := from; i < maxBlockNum; i += 10_000 {
if err := db.View(ctx, func(tx kv.Tx) error {
b, err := blockReader.BlockByNumber(ctx, tx, i)
if err != nil {
Expand Down
71 changes: 45 additions & 26 deletions eth/stagedsync/bor_heimdall_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/polygon/bor"
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/polygon/heimdall"
"github.com/erigontech/erigon/turbo/services"
Expand Down Expand Up @@ -409,26 +410,22 @@ func fetchAndWriteHeimdallStateSyncEvents(
heimdallClient heimdall.HeimdallClient,
chainID string,
logPrefix string,
logger log.Logger,
) (uint64, int, time.Duration, error) {
logger log.Logger) (uint64, int, time.Duration, error) {
fetchStart := time.Now()
// Find out the latest eventId
var (
from uint64
to time.Time
)
var fromId uint64

blockNum := header.Number.Uint64()

if config.IsIndore(blockNum) {
stateSyncDelay := config.CalculateStateSyncDelay(blockNum)
to = time.Unix(int64(header.Time-stateSyncDelay), 0)
} else {
pHeader, err := blockReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprintLength(blockNum))
if err != nil {
return lastStateSyncEventID, 0, time.Since(fetchStart), err
}
to = time.Unix(int64(pHeader.Time), 0)
if blockNum%config.CalculateSprintLength(blockNum) != 0 || blockNum == 0 {
// we fetch events only at beginning of each sprint
return lastStateSyncEventID, 0, 0, nil
}

from, to, err := bor.CalculateEventWindow(ctx, config, header, tx, blockReader)

if err != nil {
return lastStateSyncEventID, 0, time.Since(fetchStart), err
}

fetchTo := to
Expand All @@ -450,15 +447,15 @@ func fetchAndWriteHeimdallStateSyncEvents(
}
*/

from = lastStateSyncEventID + 1
fromId = lastStateSyncEventID + 1

logger.Trace(
fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix),
"fromID", from,
"fromId", fromId,
"to", to.Format(time.RFC3339),
)

eventRecords, err := heimdallClient.FetchStateSyncEvents(ctx, from, fetchTo, fetchLimit)
eventRecords, err := heimdallClient.FetchStateSyncEvents(ctx, fromId, fetchTo, fetchLimit)

if err != nil {
return lastStateSyncEventID, 0, time.Since(fetchStart), err
Expand All @@ -477,19 +474,36 @@ func fetchAndWriteHeimdallStateSyncEvents(
}

wroteIndex := false

var initialRecordTime *time.Time

for i, eventRecord := range eventRecords {
if eventRecord.ID <= lastStateSyncEventID {
continue
}

if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) {
return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf(
"invalid event record received %s, %s, %s, %s",
fmt.Sprintf("blockNum=%d", blockNum),
fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1),
fmt.Sprintf("chainId=%s (exp %s)", eventRecord.ChainID, chainID),
fmt.Sprintf("time=%s (exp to %s)", eventRecord.Time, to),
)
// Note: this check is only valid for events with eventRecord.ID > lastStateSyncEventID
var afterCheck = func(limitTime time.Time, eventTime time.Time, initialTime *time.Time) bool {
if initialTime == nil {
return eventTime.After(from)
}

return initialTime.After(from)
}

// don't apply this for devnets we may have looser state event constraints
// (TODO these probably needs fixing)
if !(chainID == "1337") {
if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID ||
!(afterCheck(from, eventRecord.Time, initialRecordTime) && eventRecord.Time.Before(to)) {
return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf(
"invalid event record received %s, %s, %s, %s",
fmt.Sprintf("blockNum=%d", blockNum),
fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1),
fmt.Sprintf("chainId=%s (exp %s)", eventRecord.ChainID, chainID),
fmt.Sprintf("time=%s (exp from %s, to %s)", eventRecord.Time, from, to),
)
}
}

data, err := eventRecord.MarshallBytes()
Expand All @@ -514,6 +528,11 @@ func fetchAndWriteHeimdallStateSyncEvents(
wroteIndex = true
}

if initialRecordTime == nil {
eventTime := eventRecord.Time
initialRecordTime = &eventTime
}

lastStateSyncEventID++
}

Expand Down
1 change: 1 addition & 0 deletions eth/stagedsync/stage_mining_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func MiningBorHeimdallForward(
logger,
lastStateSyncEventID,
)

if err != nil {
return err
}
Expand Down
Loading

0 comments on commit e4eb9fc

Please sign in to comment.