From e4eb9fcc2c7e8266124f24e3a90faa29c471010f Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Mon, 19 Aug 2024 09:23:46 +0100 Subject: [PATCH] Borevent snapshot validation (#9436) This adds integrity checking for borevents in the following places 1. Stage Bor Heimdall - check that the event occurs withing the expected time window 2. Dump Events to snapshots - check that the event ids are continuous in and between blocks (will add a time window tests) 3. Index event snapshots - check that the event ids are continuous in and between blocks It also adds an external integrity checker which runs on snapshots checking for continuity and timeliness of events. This can be called using the following command: `erigon snapshots integrity --datadir=~/snapshots/bor-mainnet-patch-1 --from=45500000` (--to specifies an end block) This also now fixes the long running issue with bor events causing unexpected fails in executions: the problem event validation uncovered was a follows: The **kv.BorEventNums** mapping table currently keeps the mapping first event id->block. The code which produces bor-event snapshots to determine which events to put into the snapshot. however if no additional blocks have events by the time the block is stored in the snapshot, the snapshot creation code does not know which events to include - so drops them. This causes problems in two places: * RPC queries & re-execution from snapshots can't find these dropped events * Depending on purge timing these events may erroneously get inserted into future blocks The code change in this PR fixes that bug. It has been tested by running: ``` erigon --datadir=~/chains/e3/amoy --chain=amoy --bor.heimdall=https://heimdall-api-amoy.polygon.technology --db.writemap=false --txpool.disable --no-downloader --bor.milestone=false ``` with validation in place and the confimed by running the following: ``` erigon snapshots rm-all-state-snapshots --datadir=~/chains/e3/amoy rm ~/chains/e3/amoy/chaindata/ erigon --datadir=~/chains/e3/amoy --chain=amoy --bor-heimdall=https://heimdall-api-amoy.polygon.technology --db.writemap=false --no-downloader --bor.milestone=false ``` To recreate the chain from snapshots. It has also been checked with: ``` erigon snapshots integrity --datadir=~/chains/e3/amoy --check=NoBorEventGaps --failFast=true" ``` --------- Co-authored-by: alex.sharov Co-authored-by: Mark Holt --- .gitignore | 3 +- eth/integrity/integrity_action_type.go | 3 +- eth/integrity/no_gaps_bor_events.go | 130 +++++++++++ eth/integrity/no_gaps_in_canonical_headers.go | 6 +- eth/integrity/snap_blocks_read.go | 9 +- eth/stagedsync/bor_heimdall_shared.go | 71 +++--- eth/stagedsync/stage_mining_bor_heimdall.go | 1 + polygon/bor/bor.go | 72 +++--- polygon/bor/snaptype/types.go | 46 +++- polygon/heimdall/event_fetch_test.go | 39 ++++ polygon/heimdall/event_record.go | 26 +++ turbo/app/snapshots_cmd.go | 22 +- .../snapshotsync/freezeblocks/block_reader.go | 12 +- .../freezeblocks/bor_snapshots.go | 221 +++++++++++++++++- 14 files changed, 558 insertions(+), 103 deletions(-) create mode 100644 eth/integrity/no_gaps_bor_events.go create mode 100644 polygon/heimdall/event_fetch_test.go diff --git a/.gitignore b/.gitignore index de1eda27d51..ce201041177 100644 --- a/.gitignore +++ b/.gitignore @@ -16,8 +16,7 @@ build/_vendor/pkg /*.a docs/readthedocs/build -/.VSCodeCounter/ - +/.VSCodeCounter #* .#* *# diff --git a/eth/integrity/integrity_action_type.go b/eth/integrity/integrity_action_type.go index f25d79d4378..f7acb21dde2 100644 --- a/eth/integrity/integrity_action_type.go +++ b/eth/integrity/integrity_action_type.go @@ -23,8 +23,9 @@ const ( BlocksTxnID Check = "BlocksTxnID" InvertedIndex Check = "InvertedIndex" HistoryNoSystemTxs Check = "HistoryNoSystemTxs" + NoBorEventGaps Check = "NoBorEventGaps" ) var AllChecks = []Check{ - Blocks, BlocksTxnID, InvertedIndex, HistoryNoSystemTxs, + Blocks, BlocksTxnID, InvertedIndex, HistoryNoSystemTxs, NoBorEventGaps, } diff --git a/eth/integrity/no_gaps_bor_events.go b/eth/integrity/no_gaps_bor_events.go new file mode 100644 index 00000000000..45d70f93ba3 --- /dev/null +++ b/eth/integrity/no_gaps_bor_events.go @@ -0,0 +1,130 @@ +package integrity + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/erigontech/erigon-lib/chain" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/core" + "github.com/erigontech/erigon/eth/stagedsync/stages" + "github.com/erigontech/erigon/polygon/bor/borcfg" + "github.com/erigontech/erigon/turbo/services" + "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" +) + +func NoGapsInBorEvents(ctx context.Context, db kv.RoDB, blockReader services.FullBlockReader, from, to uint64, failFast bool) (err error) { + defer func() { + log.Info("[integrity] NoGapsInBorEvents: done", "err", err) + }() + + var cc *chain.Config + + if db == nil { + genesis := core.BorMainnetGenesisBlock() + cc = genesis.Config + } else { + err = db.View(ctx, func(tx kv.Tx) error { + cc, err = chain.GetConfig(tx, nil) + if err != nil { + return err + } + return nil + }) + + if err != nil { + err = fmt.Errorf("cant read chain config from db: %w", err) + return err + } + } + + if cc.BorJSON == nil { + return err + } + + config := &borcfg.BorConfig{} + + if err := json.Unmarshal(cc.BorJSON, config); err != nil { + err = fmt.Errorf("invalid chain config 'bor' JSON: %w", err) + return err + } + + logEvery := time.NewTicker(10 * time.Second) + defer logEvery.Stop() + + snapshots := blockReader.BorSnapshots().(*freezeblocks.BorRoSnapshots) + + var prevEventId uint64 + var maxBlockNum uint64 + + if to > 0 { + maxBlockNum = to + } else { + maxBlockNum = snapshots.SegmentsMax() + } + + view := snapshots.View() + defer view.Close() + + for _, eventSegment := range view.Events() { + + if from > 0 && eventSegment.From() < from { + continue + } + + if to > 0 && eventSegment.From() > to { + break + } + + prevEventId, err = freezeblocks.ValidateBorEvents(ctx, config, db, blockReader, eventSegment, prevEventId, maxBlockNum, failFast, logEvery) + + if err != nil && failFast { + return err + } + } + + if db != nil { + err = db.View(ctx, func(tx kv.Tx) error { + if false { + lastEventId, _, err := blockReader.LastEventId(ctx, tx) + + if err != nil { + return err + } + + borHeimdallProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall) + + if err != nil { + return err + } + + bodyProgress, err := stages.GetStageProgress(tx, stages.Bodies) + + if err != nil { + return err + } + + log.Info("[integrity] LAST Event", "event", lastEventId, "bor-progress", borHeimdallProgress, "body-progress", bodyProgress) + + if bodyProgress > borHeimdallProgress { + for blockNum := maxBlockNum + 1; blockNum <= bodyProgress; blockNum++ { + + } + } + } + + return nil + }) + + if err != nil { + return err + } + } + + log.Info("[integrity] done checking bor events", "event", prevEventId) + + return nil +} diff --git a/eth/integrity/no_gaps_in_canonical_headers.go b/eth/integrity/no_gaps_in_canonical_headers.go index 433941ad659..b9fae306fe8 100644 --- a/eth/integrity/no_gaps_in_canonical_headers.go +++ b/eth/integrity/no_gaps_in_canonical_headers.go @@ -50,17 +50,17 @@ func NoGapsInCanonicalHeaders(tx kv.Tx, ctx context.Context, br services.FullBlo panic(err) } if hash == (common.Hash{}) { - err = fmt.Errorf("canonical marker not found: %d\n", i) + err = fmt.Errorf("canonical marker not found: %d", i) panic(err) } header := rawdb.ReadHeader(tx, hash, i) if header == nil { - err = fmt.Errorf("header not found: %d\n", i) + err = fmt.Errorf("header not found: %d", i) panic(err) } body, _, _ := rawdb.ReadBody(tx, hash, i) if body == nil { - err = fmt.Errorf("header not found: %d\n", i) + err = fmt.Errorf("header not found: %d", i) panic(err) } diff --git a/eth/integrity/snap_blocks_read.go b/eth/integrity/snap_blocks_read.go index 33816a60d4f..28e927727a5 100644 --- a/eth/integrity/snap_blocks_read.go +++ b/eth/integrity/snap_blocks_read.go @@ -28,13 +28,18 @@ import ( "github.com/erigontech/erigon/turbo/services" ) -func SnapBlocksRead(db kv.RoDB, blockReader services.FullBlockReader, ctx context.Context, failFast bool) error { +func SnapBlocksRead(ctx context.Context, db kv.RoDB, blockReader services.FullBlockReader, from, to uint64, failFast bool) error { defer log.Info("[integrity] SnapBlocksRead: done") logEvery := time.NewTicker(10 * time.Second) defer logEvery.Stop() maxBlockNum := blockReader.Snapshots().SegmentsMax() - for i := uint64(0); i < maxBlockNum; i += 10_000 { + + if to != 0 && maxBlockNum > to { + maxBlockNum = 2 + } + + for i := from; i < maxBlockNum; i += 10_000 { if err := db.View(ctx, func(tx kv.Tx) error { b, err := blockReader.BlockByNumber(ctx, tx, i) if err != nil { diff --git a/eth/stagedsync/bor_heimdall_shared.go b/eth/stagedsync/bor_heimdall_shared.go index 9713e26d4cd..fdd79c29818 100644 --- a/eth/stagedsync/bor_heimdall_shared.go +++ b/eth/stagedsync/bor_heimdall_shared.go @@ -29,6 +29,7 @@ import ( "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon/core/types" + "github.com/erigontech/erigon/polygon/bor" "github.com/erigontech/erigon/polygon/bor/borcfg" "github.com/erigontech/erigon/polygon/heimdall" "github.com/erigontech/erigon/turbo/services" @@ -409,26 +410,22 @@ func fetchAndWriteHeimdallStateSyncEvents( heimdallClient heimdall.HeimdallClient, chainID string, logPrefix string, - logger log.Logger, -) (uint64, int, time.Duration, error) { + logger log.Logger) (uint64, int, time.Duration, error) { fetchStart := time.Now() // Find out the latest eventId - var ( - from uint64 - to time.Time - ) + var fromId uint64 blockNum := header.Number.Uint64() - if config.IsIndore(blockNum) { - stateSyncDelay := config.CalculateStateSyncDelay(blockNum) - to = time.Unix(int64(header.Time-stateSyncDelay), 0) - } else { - pHeader, err := blockReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprintLength(blockNum)) - if err != nil { - return lastStateSyncEventID, 0, time.Since(fetchStart), err - } - to = time.Unix(int64(pHeader.Time), 0) + if blockNum%config.CalculateSprintLength(blockNum) != 0 || blockNum == 0 { + // we fetch events only at beginning of each sprint + return lastStateSyncEventID, 0, 0, nil + } + + from, to, err := bor.CalculateEventWindow(ctx, config, header, tx, blockReader) + + if err != nil { + return lastStateSyncEventID, 0, time.Since(fetchStart), err } fetchTo := to @@ -450,15 +447,15 @@ func fetchAndWriteHeimdallStateSyncEvents( } */ - from = lastStateSyncEventID + 1 + fromId = lastStateSyncEventID + 1 logger.Trace( fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix), - "fromID", from, + "fromId", fromId, "to", to.Format(time.RFC3339), ) - eventRecords, err := heimdallClient.FetchStateSyncEvents(ctx, from, fetchTo, fetchLimit) + eventRecords, err := heimdallClient.FetchStateSyncEvents(ctx, fromId, fetchTo, fetchLimit) if err != nil { return lastStateSyncEventID, 0, time.Since(fetchStart), err @@ -477,19 +474,36 @@ func fetchAndWriteHeimdallStateSyncEvents( } wroteIndex := false + + var initialRecordTime *time.Time + for i, eventRecord := range eventRecords { if eventRecord.ID <= lastStateSyncEventID { continue } - if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) { - return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf( - "invalid event record received %s, %s, %s, %s", - fmt.Sprintf("blockNum=%d", blockNum), - fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1), - fmt.Sprintf("chainId=%s (exp %s)", eventRecord.ChainID, chainID), - fmt.Sprintf("time=%s (exp to %s)", eventRecord.Time, to), - ) + // Note: this check is only valid for events with eventRecord.ID > lastStateSyncEventID + var afterCheck = func(limitTime time.Time, eventTime time.Time, initialTime *time.Time) bool { + if initialTime == nil { + return eventTime.After(from) + } + + return initialTime.After(from) + } + + // don't apply this for devnets we may have looser state event constraints + // (TODO these probably needs fixing) + if !(chainID == "1337") { + if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID || + !(afterCheck(from, eventRecord.Time, initialRecordTime) && eventRecord.Time.Before(to)) { + return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf( + "invalid event record received %s, %s, %s, %s", + fmt.Sprintf("blockNum=%d", blockNum), + fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1), + fmt.Sprintf("chainId=%s (exp %s)", eventRecord.ChainID, chainID), + fmt.Sprintf("time=%s (exp from %s, to %s)", eventRecord.Time, from, to), + ) + } } data, err := eventRecord.MarshallBytes() @@ -514,6 +528,11 @@ func fetchAndWriteHeimdallStateSyncEvents( wroteIndex = true } + if initialRecordTime == nil { + eventTime := eventRecord.Time + initialRecordTime = &eventTime + } + lastStateSyncEventID++ } diff --git a/eth/stagedsync/stage_mining_bor_heimdall.go b/eth/stagedsync/stage_mining_bor_heimdall.go index 25644bbefac..3561dce54cb 100644 --- a/eth/stagedsync/stage_mining_bor_heimdall.go +++ b/eth/stagedsync/stage_mining_bor_heimdall.go @@ -93,6 +93,7 @@ func MiningBorHeimdallForward( logger, lastStateSyncEventID, ) + if err != nil { return err } diff --git a/polygon/bor/bor.go b/polygon/bor/bor.go index 191efcca8b8..29abb547ddb 100644 --- a/polygon/bor/bor.go +++ b/polygon/bor/bor.go @@ -251,6 +251,35 @@ type ValidateHeaderTimeSignerSuccessionNumber interface { GetSignerSuccessionNumber(signer libcommon.Address, number uint64) (int, error) } +func CalculateEventWindow(ctx context.Context, config *borcfg.BorConfig, header *types.Header, tx kv.Getter, headerReader services.HeaderReader) (from time.Time, to time.Time, err error) { + + blockNum := header.Number.Uint64() + blockNum += blockNum % config.CalculateSprintLength(blockNum) + + prevHeader, err := headerReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprintLength(blockNum)) + + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("window calculation failed: %w", err) + } + + if config.IsIndore(blockNum) { + stateSyncDelay := config.CalculateStateSyncDelay(blockNum) + to = time.Unix(int64(header.Time-stateSyncDelay), 0) + from = time.Unix(int64(prevHeader.Time-stateSyncDelay), 0) + } else { + to = time.Unix(int64(prevHeader.Time), 0) + prevHeader, err := headerReader.HeaderByNumber(ctx, tx, prevHeader.Number.Uint64()-config.CalculateSprintLength(prevHeader.Number.Uint64())) + + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("window calculation failed: %w", err) + } + + from = time.Unix(int64(prevHeader.Time), 0) + } + + return from, to, nil +} + type spanReader interface { Span(ctx context.Context, id uint64) (*heimdall.Span, bool, error) } @@ -1511,49 +1540,6 @@ func (c *Bor) CommitStates( events := chain.Chain.BorEventsByBlock(header.Hash(), blockNum) - //if len(events) == 50 || len(events) == 0 { // we still sometime could get 0 events from borevent file - if blockNum <= chain.Chain.FrozenBorBlocks() && len(events) == 50 { // we still sometime could get 0 events from borevent file - var to time.Time - if c.config.IsIndore(blockNum) { - stateSyncDelay := c.config.CalculateStateSyncDelay(blockNum) - to = time.Unix(int64(header.Time-stateSyncDelay), 0) - } else { - pHeader := chain.Chain.GetHeaderByNumber(blockNum - c.config.CalculateSprintLength(blockNum)) - to = time.Unix(int64(pHeader.Time), 0) - } - - startEventID := chain.Chain.BorStartEventID(header.Hash(), blockNum) - log.Warn("[dbg] fallback to remote bor events", "blockNum", blockNum, "startEventID", startEventID, "events_from_db_or_snaps", len(events)) - remote, err := c.HeimdallClient.FetchStateSyncEvents(context.Background(), startEventID, to, 0) - if err != nil { - return err - } - if len(remote) > 0 { - chainID := c.chainConfig.ChainID.String() - - var merged []*heimdall.EventRecordWithTime - events = events[:0] - for _, event := range remote { - if event.ChainID != chainID { - continue - } - if event.Time.After(to) { - continue - } - merged = append(merged, event) - } - - for _, ev := range merged { - data, err := ev.MarshallBytes() - if err != nil { - panic(err) - } - - events = append(events, data) - } - } - } - for _, event := range events { if err := c.stateReceiver.CommitState(event, syscall); err != nil { return err diff --git a/polygon/bor/snaptype/types.go b/polygon/bor/snaptype/types.go index cd4316ff2db..bacbf3263ec 100644 --- a/polygon/bor/snaptype/types.go +++ b/polygon/bor/snaptype/types.go @@ -102,6 +102,7 @@ var ( var prevBlockNum uint64 var startEventId uint64 var lastEventId uint64 + if err := kv.BigChunks(db, kv.BorEventNums, from, func(tx kv.Tx, blockNumBytes, eventIdBytes []byte) (bool, error) { blockNum := binary.BigEndian.Uint64(blockNumBytes) if first { @@ -141,15 +142,44 @@ var ( }); err != nil { return 0, err } - if lastEventId > startEventId { - if err := db.View(ctx, func(tx kv.Tx) error { - blockHash, e := rawdb.ReadCanonicalHash(tx, prevBlockNum) - if e != nil { - return e + + if lastEventId > 0 { + // if we have reached the last entry in kv.BorEventNums we will have a start event + // but no end - so we assume that the last event is going to be the last recorded + // event in the db + if startEventId == lastEventId { + if err := db.View(ctx, func(tx kv.Tx) error { + cursor, err := tx.Cursor(kv.BorEvents) + if err != nil { + return err + } + + defer cursor.Close() + k, _, err := cursor.Last() + if err != nil { + return err + } + + if k != nil { + lastEventId = binary.BigEndian.Uint64(k) + } + + return nil + }); err != nil { + return 0, err + } + } + + if lastEventId >= startEventId { + if err := db.View(ctx, func(tx kv.Tx) error { + blockHash, e := rawdb.ReadCanonicalHash(tx, prevBlockNum) + if e != nil { + return e + } + return extractEventRange(startEventId, lastEventId+1, tx, prevBlockNum, blockHash, collect) + }); err != nil { + return 0, err } - return extractEventRange(startEventId, lastEventId+1, tx, prevBlockNum, blockHash, collect) - }); err != nil { - return 0, err } } diff --git a/polygon/heimdall/event_fetch_test.go b/polygon/heimdall/event_fetch_test.go new file mode 100644 index 00000000000..5411ee46629 --- /dev/null +++ b/polygon/heimdall/event_fetch_test.go @@ -0,0 +1,39 @@ +package heimdall + +import ( + "context" + "testing" + "time" + + "github.com/erigontech/erigon-lib/log/v3" +) + +func TestOver50EventBlockFetch(t *testing.T) { + heimdallClient := NewHeimdallClient("https://heimdall-api.polygon.technology/", log.New()) + + // block := 48077376 + // block time := Sep-28-2023 08:13:58 AM + events, err := heimdallClient.FetchStateSyncEvents(context.Background(), 2774290, time.Unix(1695888838-128, 0), 0) + + if err != nil { + t.Fatal(err) + } + + if len(events) != 113 { + t.Fatal("Unexpected event count, exptected: 113, got:", len(events)) + } + + // block := 23893568 + // block time := Jan-19-2022 04:48:04 AM + + events, err = heimdallClient.FetchStateSyncEvents(context.Background(), 1479540, time.Unix(1642567374, 0), 0) + + if err != nil { + t.Fatal(err) + } + + if len(events) != 9417 { + t.Fatal("Unexpected event count, exptected: 9417, got:", len(events)) + } + +} diff --git a/polygon/heimdall/event_record.go b/polygon/heimdall/event_record.go index eba493529ef..ad897f9302b 100644 --- a/polygon/heimdall/event_record.go +++ b/polygon/heimdall/event_record.go @@ -131,3 +131,29 @@ type StateSyncEventResponse struct { Height string `json:"height"` Result EventRecordWithTime `json:"result"` } + +var methodId []byte = borabi.StateReceiverContractABI().Methods["commitState"].ID + +func EventTime(encodedEvent rlp.RawValue) time.Time { + if bytes.Equal(methodId, encodedEvent[0:4]) { + return time.Unix((&big.Int{}).SetBytes(encodedEvent[4:36]).Int64(), 0) + } + + return time.Time{} +} + +var commitStateInputs = borabi.StateReceiverContractABI().Methods["commitState"].Inputs + +func EventId(encodedEvent rlp.RawValue) uint64 { + if bytes.Equal(methodId, encodedEvent[0:4]) { + args, _ := commitStateInputs.Unpack(encodedEvent[4:]) + + if len(args) == 2 { + var eventRecord EventRecord + if err := rlp.DecodeBytes(args[1].([]byte), &eventRecord); err == nil { + return eventRecord.ID + } + } + } + return 0 +} diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index 4e320886736..cee54e89891 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -485,8 +485,9 @@ func doIntegrity(cliCtx *cli.Context) error { defer chainDB.Close() cfg := ethconfig.NewSnapCfg(false, true, true) + from := cliCtx.Uint64(SnapshotFromFlag.Name) - _, _, _, blockRetire, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger) + _, _, _, blockRetire, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) if err != nil { return err } @@ -503,7 +504,7 @@ func doIntegrity(cliCtx *cli.Context) error { return err } case integrity.Blocks: - if err := integrity.SnapBlocksRead(chainDB, blockReader, ctx, failFast); err != nil { + if err := integrity.SnapBlocksRead(ctx, chainDB, blockReader, 0, 0, failFast); err != nil { return err } case integrity.InvertedIndex: @@ -514,6 +515,11 @@ func doIntegrity(cliCtx *cli.Context) error { if err := integrity.E3HistoryNoSystemTxs(ctx, chainDB, agg); err != nil { return err } + case integrity.NoBorEventGaps: + if err := integrity.NoGapsInBorEvents(ctx, chainDB, blockReader, 0, 0, failFast); err != nil { + return err + } + default: return fmt.Errorf("unknown check: %s", chk) } @@ -957,7 +963,9 @@ func doIndicesCommand(cliCtx *cli.Context, dirs datadir.Dirs) error { cfg := ethconfig.NewSnapCfg(false, true, true) chainConfig := fromdb.ChainConfig(chainDB) - _, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger) + from := cliCtx.Uint64(SnapshotFromFlag.Name) + + _, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) if err != nil { return err } @@ -987,7 +995,8 @@ func doLS(cliCtx *cli.Context, dirs datadir.Dirs) error { chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() defer chainDB.Close() cfg := ethconfig.NewSnapCfg(false, true, true) - blockSnaps, borSnaps, caplinSnaps, _, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger) + from := cliCtx.Uint64(SnapshotFromFlag.Name) + blockSnaps, borSnaps, caplinSnaps, _, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) if err != nil { return err } @@ -1001,7 +1010,7 @@ func doLS(cliCtx *cli.Context, dirs datadir.Dirs) error { return nil } -func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.Dirs, chainDB kv.RwDB, logger log.Logger) ( +func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.Dirs, from uint64, chainDB kv.RwDB, logger log.Logger) ( blockSnaps *freezeblocks.RoSnapshots, borSnaps *freezeblocks.BorRoSnapshots, csn *freezeblocks.CaplinSnapshots, br *freezeblocks.BlockRetire, agg *libstate.Aggregator, clean func(), err error, ) { @@ -1184,7 +1193,8 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error { defer db.Close() cfg := ethconfig.NewSnapCfg(false, true, true) - blockSnaps, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, db, logger) + + blockSnaps, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, db, logger) if err != nil { return err } diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index b461dfad179..b184b5ac45e 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -1488,18 +1488,17 @@ func (r *BlockReader) EventsByIdFromSnapshot(from uint64, to time.Time, limit in } if event.Time.After(to) { maxTime = true - goto BREAK + return result, maxTime, nil } result = append(result, &event) if len(result) == limit { - goto BREAK + return result, maxTime, nil } } } -BREAK: return result, maxTime, nil } @@ -1545,8 +1544,11 @@ func (r *BlockReader) LastFrozenEventId() uint64 { var lastSegment *Segment for i := len(segments) - 1; i >= 0; i-- { if segments[i].Index() != nil { - lastSegment = segments[i] - break + gg := segments[i].MakeGetter() + if gg.HasNext() { + lastSegment = segments[i] + break + } } } if lastSegment == nil { diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index d66160cdf3e..9ff4cd46f8b 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -18,33 +18,34 @@ package freezeblocks import ( "context" + "encoding/binary" "fmt" "os" "path/filepath" "reflect" + "time" "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/common/length" + "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/common/dbg" "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cmd/hack/tool/fromdb" + "github.com/erigontech/erigon/core/types" "github.com/erigontech/erigon/eth/ethconfig" + "github.com/erigontech/erigon/polygon/bor" + "github.com/erigontech/erigon/polygon/bor/borcfg" borsnaptype "github.com/erigontech/erigon/polygon/bor/snaptype" + "github.com/erigontech/erigon/polygon/heimdall" "github.com/erigontech/erigon/turbo/services" ) -var BorProduceFiles = dbg.EnvBool("BOR_PRODUCE_FILES", false) - func (br *BlockRetire) dbHasEnoughDataForBorRetire(ctx context.Context) (bool, error) { return true, nil } func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) { - if !BorProduceFiles { - return false, nil - } - select { case <-ctx.Done(): return false, ctx.Err() @@ -246,6 +247,212 @@ func (s *BorRoSnapshots) ReopenFolder() error { return nil } +func checkBlockEvents(ctx context.Context, config *borcfg.BorConfig, blockReader services.FullBlockReader, + block uint64, prevBlock uint64, eventId uint64, prevBlockStartId uint64, prevEventTime *time.Time, tx kv.Tx, failFast bool) (*time.Time, error) { + header, err := blockReader.HeaderByNumber(ctx, tx, prevBlock) + + if err != nil { + if failFast { + return nil, fmt.Errorf("can't get header for block %d: %w", block, err) + } + + log.Error("[integrity] NoGapsInBorEvents: can't get header for block", "block", block, "err", err) + } + + events, err := blockReader.EventsByBlock(ctx, tx, header.Hash(), header.Number.Uint64()) + + if err != nil { + if failFast { + return nil, fmt.Errorf("can't get events for block %d: %w", block, err) + } + + log.Error("[integrity] NoGapsInBorEvents: can't get events for block", "block", block, "err", err) + } + + if prevBlockStartId != 0 { + if len(events) != int(eventId-prevBlockStartId) { + if failFast { + return nil, fmt.Errorf("block event mismatch at %d: expected: %d, got: %d", block, eventId-prevBlockStartId, len(events)) + } + + log.Error("[integrity] NoGapsInBorEvents: block event count mismatch", "block", block, "eventId", eventId, "expected", eventId-prevBlockStartId, "got", len(events)) + } + } + + var lastBlockEventTime time.Time + var firstBlockEventTime *time.Time + + for i, event := range events { + + var eventId uint64 + + if prevBlockStartId != 0 { + eventId = heimdall.EventId(event) + + if eventId != prevBlockStartId+uint64(i) { + if failFast { + return nil, fmt.Errorf("invalid event id %d for event %d in block %d: expected: %d", eventId, i, block, prevBlockStartId+uint64(i)) + } + + log.Error("[integrity] NoGapsInBorEvents: invalid event id", "block", block, "event", i, "expected", prevBlockStartId+uint64(i), "got", eventId) + } + } else { + eventId = prevBlockStartId + uint64(i) + } + + eventTime := heimdall.EventTime(event) + + //if i != 0 { + // if eventTime.Before(lastBlockEventTime) { + // eventTime = lastBlockEventTime + // } + //} + + if i == 0 { + lastBlockEventTime = eventTime + } + + const warnPrevTimes = false + + if prevEventTime != nil { + if eventTime.Before(*prevEventTime) && warnPrevTimes { + log.Warn("[integrity] NoGapsInBorEvents: event time before prev", "block", block, "event", eventId, "time", eventTime, "prev", *prevEventTime, "diff", -prevEventTime.Sub(eventTime)) + } + } + + prevEventTime = &eventTime + + if !checkBlockWindow(ctx, eventTime, firstBlockEventTime, config, header, tx, blockReader) { + from, to, _ := bor.CalculateEventWindow(ctx, config, header, tx, blockReader) + + var diff time.Duration + + if eventTime.Before(from) { + diff = -from.Sub(eventTime) + } else if eventTime.After(to) { + diff = to.Sub(eventTime) + } + + if failFast { + return nil, fmt.Errorf("invalid time %s for event %d in block %d: expected %s-%s", eventTime, eventId, block, from, to) + } + + log.Error(fmt.Sprintf("[integrity] NoGapsInBorEvents: invalid event time at %d of %d", i, len(events)), "block", block, "event", eventId, "time", eventTime, "diff", diff, "expected", fmt.Sprintf("%s-%s", from, to), "block-start", prevBlockStartId, "first-time", lastBlockEventTime, "timestamps", fmt.Sprintf("%d-%d", from.Unix(), to.Unix())) + } + + if firstBlockEventTime == nil { + firstBlockEventTime = &eventTime + } + } + + return prevEventTime, nil +} + +func ValidateBorEvents(ctx context.Context, config *borcfg.BorConfig, db kv.RoDB, blockReader services.FullBlockReader, eventSegment *Segment, prevEventId uint64, maxBlockNum uint64, failFast bool, logEvery *time.Ticker) (uint64, error) { + g := eventSegment.Decompressor.MakeGetter() + + word := make([]byte, 0, 4096) + + var prevBlock, prevBlockStartId uint64 + var prevEventTime *time.Time + + for g.HasNext() { + word, _ = g.Next(word[:0]) + + block := binary.BigEndian.Uint64(word[length.Hash : length.Hash+length.BlockNum]) + eventId := binary.BigEndian.Uint64(word[length.Hash+length.BlockNum : length.Hash+length.BlockNum+8]) + event := word[length.Hash+length.BlockNum+8:] + + recordId := heimdall.EventId(event) + + if recordId != eventId { + if failFast { + return prevEventId, fmt.Errorf("invalid event id %d in block %d: expected: %d", recordId, block, eventId) + } + + log.Error("[integrity] NoGapsInBorEvents: invalid event id", "block", block, "event", recordId, "expected", eventId) + } + + if prevEventId > 0 { + switch { + case eventId < prevEventId: + if failFast { + return prevEventId, fmt.Errorf("invaid bor event %d (prev=%d) at block=%d", eventId, prevEventId, block) + } + + log.Error("[integrity] NoGapsInBorEvents: invalid bor event", "event", eventId, "prev", prevEventId, "block", block) + + case eventId != prevEventId+1: + if failFast { + return prevEventId, fmt.Errorf("missing bor event %d (prev=%d) at block=%d", eventId, prevEventId, block) + } + + log.Error("[integrity] NoGapsInBorEvents: missing bor event", "event", eventId, "prev", prevEventId, "block", block) + } + } + + //if prevEventId == 0 { + //log.Info("[integrity] checking bor events", "event", eventId, "block", block) + //} + + if prevBlock != 0 && prevBlock != block { + var err error + + if db != nil { + err = db.View(ctx, func(tx kv.Tx) error { + prevEventTime, err = checkBlockEvents(ctx, config, blockReader, block, prevBlock, eventId, prevBlockStartId, prevEventTime, tx, failFast) + return err + }) + } else { + prevEventTime, err = checkBlockEvents(ctx, config, blockReader, block, prevBlock, eventId, prevBlockStartId, prevEventTime, nil, failFast) + } + + if err != nil { + return prevEventId, err + } + + prevBlockStartId = eventId + } + + prevEventId = eventId + prevBlock = block + + var logChan <-chan time.Time + + if logEvery != nil { + logChan = logEvery.C + } + + select { + case <-ctx.Done(): + return prevEventId, ctx.Err() + case <-logChan: + log.Info("[integrity] NoGapsInBorEvents", "blockNum", fmt.Sprintf("%dK/%dK", binary.BigEndian.Uint64(word[length.Hash:length.Hash+length.BlockNum])/1000, maxBlockNum/1000)) + default: + } + } + + return prevEventId, nil +} + +func checkBlockWindow(ctx context.Context, eventTime time.Time, firstBlockEventTime *time.Time, config *borcfg.BorConfig, header *types.Header, tx kv.Getter, headerReader services.HeaderReader) bool { + from, to, err := bor.CalculateEventWindow(ctx, config, header, tx, headerReader) + + if err != nil { + return false + } + + var afterCheck = func(limitTime time.Time, eventTime time.Time, initialTime *time.Time) bool { + if initialTime == nil { + return eventTime.After(from) + } + + return initialTime.After(from) + } + + return !(afterCheck(from, eventTime, firstBlockEventTime) || eventTime.After(to)) +} + type BorView struct { base *View }