diff --git a/.gitignore b/.gitignore index de1eda27d51..ce201041177 100644 --- a/.gitignore +++ b/.gitignore @@ -16,8 +16,7 @@ build/_vendor/pkg /*.a docs/readthedocs/build -/.VSCodeCounter/ - +/.VSCodeCounter #* .#* *# diff --git a/eth/integrity/integrity_action_type.go b/eth/integrity/integrity_action_type.go index f25d79d4378..f7acb21dde2 100644 --- a/eth/integrity/integrity_action_type.go +++ b/eth/integrity/integrity_action_type.go @@ -23,8 +23,9 @@ const ( BlocksTxnID Check = "BlocksTxnID" InvertedIndex Check = "InvertedIndex" HistoryNoSystemTxs Check = "HistoryNoSystemTxs" + NoBorEventGaps Check = "NoBorEventGaps" ) var AllChecks = []Check{ - Blocks, BlocksTxnID, InvertedIndex, HistoryNoSystemTxs, + Blocks, BlocksTxnID, InvertedIndex, HistoryNoSystemTxs, NoBorEventGaps, } diff --git a/eth/integrity/no_gaps_bor_events.go b/eth/integrity/no_gaps_bor_events.go new file mode 100644 index 00000000000..45d70f93ba3 --- /dev/null +++ b/eth/integrity/no_gaps_bor_events.go @@ -0,0 +1,130 @@ +package integrity + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/erigontech/erigon-lib/chain" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/core" + "github.com/erigontech/erigon/eth/stagedsync/stages" + "github.com/erigontech/erigon/polygon/bor/borcfg" + "github.com/erigontech/erigon/turbo/services" + "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" +) + +func NoGapsInBorEvents(ctx context.Context, db kv.RoDB, blockReader services.FullBlockReader, from, to uint64, failFast bool) (err error) { + defer func() { + log.Info("[integrity] NoGapsInBorEvents: done", "err", err) + }() + + var cc *chain.Config + + if db == nil { + genesis := core.BorMainnetGenesisBlock() + cc = genesis.Config + } else { + err = db.View(ctx, func(tx kv.Tx) error { + cc, err = chain.GetConfig(tx, nil) + if err != nil { + return err + } + return nil + }) + + if err != nil { + err = fmt.Errorf("cant read chain config from db: %w", err) + return err + } + } + + if cc.BorJSON == nil { + return err + } + + config := &borcfg.BorConfig{} + + if err := json.Unmarshal(cc.BorJSON, config); err != nil { + err = fmt.Errorf("invalid chain config 'bor' JSON: %w", err) + return err + } + + logEvery := time.NewTicker(10 * time.Second) + defer logEvery.Stop() + + snapshots := blockReader.BorSnapshots().(*freezeblocks.BorRoSnapshots) + + var prevEventId uint64 + var maxBlockNum uint64 + + if to > 0 { + maxBlockNum = to + } else { + maxBlockNum = snapshots.SegmentsMax() + } + + view := snapshots.View() + defer view.Close() + + for _, eventSegment := range view.Events() { + + if from > 0 && eventSegment.From() < from { + continue + } + + if to > 0 && eventSegment.From() > to { + break + } + + prevEventId, err = freezeblocks.ValidateBorEvents(ctx, config, db, blockReader, eventSegment, prevEventId, maxBlockNum, failFast, logEvery) + + if err != nil && failFast { + return err + } + } + + if db != nil { + err = db.View(ctx, func(tx kv.Tx) error { + if false { + lastEventId, _, err := blockReader.LastEventId(ctx, tx) + + if err != nil { + return err + } + + borHeimdallProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall) + + if err != nil { + return err + } + + bodyProgress, err := stages.GetStageProgress(tx, stages.Bodies) + + if err != nil { + return err + } + + log.Info("[integrity] LAST Event", "event", lastEventId, "bor-progress", borHeimdallProgress, "body-progress", bodyProgress) + + if bodyProgress > borHeimdallProgress { + for blockNum := maxBlockNum + 1; blockNum <= bodyProgress; blockNum++ { + + } + } + } + + return nil + }) + + if err != nil { + return err + } + } + + log.Info("[integrity] done checking bor events", "event", prevEventId) + + return nil +} diff --git a/eth/integrity/no_gaps_in_canonical_headers.go b/eth/integrity/no_gaps_in_canonical_headers.go index 433941ad659..b9fae306fe8 100644 --- a/eth/integrity/no_gaps_in_canonical_headers.go +++ b/eth/integrity/no_gaps_in_canonical_headers.go @@ -50,17 +50,17 @@ func NoGapsInCanonicalHeaders(tx kv.Tx, ctx context.Context, br services.FullBlo panic(err) } if hash == (common.Hash{}) { - err = fmt.Errorf("canonical marker not found: %d\n", i) + err = fmt.Errorf("canonical marker not found: %d", i) panic(err) } header := rawdb.ReadHeader(tx, hash, i) if header == nil { - err = fmt.Errorf("header not found: %d\n", i) + err = fmt.Errorf("header not found: %d", i) panic(err) } body, _, _ := rawdb.ReadBody(tx, hash, i) if body == nil { - err = fmt.Errorf("header not found: %d\n", i) + err = fmt.Errorf("header not found: %d", i) panic(err) } diff --git a/eth/integrity/snap_blocks_read.go b/eth/integrity/snap_blocks_read.go index 33816a60d4f..28e927727a5 100644 --- a/eth/integrity/snap_blocks_read.go +++ b/eth/integrity/snap_blocks_read.go @@ -28,13 +28,18 @@ import ( "github.com/erigontech/erigon/turbo/services" ) -func SnapBlocksRead(db kv.RoDB, blockReader services.FullBlockReader, ctx context.Context, failFast bool) error { +func SnapBlocksRead(ctx context.Context, db kv.RoDB, blockReader services.FullBlockReader, from, to uint64, failFast bool) error { defer log.Info("[integrity] SnapBlocksRead: done") logEvery := time.NewTicker(10 * time.Second) defer logEvery.Stop() maxBlockNum := blockReader.Snapshots().SegmentsMax() - for i := uint64(0); i < maxBlockNum; i += 10_000 { + + if to != 0 && maxBlockNum > to { + maxBlockNum = 2 + } + + for i := from; i < maxBlockNum; i += 10_000 { if err := db.View(ctx, func(tx kv.Tx) error { b, err := blockReader.BlockByNumber(ctx, tx, i) if err != nil { diff --git a/eth/stagedsync/bor_heimdall_shared.go b/eth/stagedsync/bor_heimdall_shared.go index 9713e26d4cd..fdd79c29818 100644 --- a/eth/stagedsync/bor_heimdall_shared.go +++ b/eth/stagedsync/bor_heimdall_shared.go @@ -29,6 +29,7 @@ import ( "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon/core/types" + "github.com/erigontech/erigon/polygon/bor" "github.com/erigontech/erigon/polygon/bor/borcfg" "github.com/erigontech/erigon/polygon/heimdall" "github.com/erigontech/erigon/turbo/services" @@ -409,26 +410,22 @@ func fetchAndWriteHeimdallStateSyncEvents( heimdallClient heimdall.HeimdallClient, chainID string, logPrefix string, - logger log.Logger, -) (uint64, int, time.Duration, error) { + logger log.Logger) (uint64, int, time.Duration, error) { fetchStart := time.Now() // Find out the latest eventId - var ( - from uint64 - to time.Time - ) + var fromId uint64 blockNum := header.Number.Uint64() - if config.IsIndore(blockNum) { - stateSyncDelay := config.CalculateStateSyncDelay(blockNum) - to = time.Unix(int64(header.Time-stateSyncDelay), 0) - } else { - pHeader, err := blockReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprintLength(blockNum)) - if err != nil { - return lastStateSyncEventID, 0, time.Since(fetchStart), err - } - to = time.Unix(int64(pHeader.Time), 0) + if blockNum%config.CalculateSprintLength(blockNum) != 0 || blockNum == 0 { + // we fetch events only at beginning of each sprint + return lastStateSyncEventID, 0, 0, nil + } + + from, to, err := bor.CalculateEventWindow(ctx, config, header, tx, blockReader) + + if err != nil { + return lastStateSyncEventID, 0, time.Since(fetchStart), err } fetchTo := to @@ -450,15 +447,15 @@ func fetchAndWriteHeimdallStateSyncEvents( } */ - from = lastStateSyncEventID + 1 + fromId = lastStateSyncEventID + 1 logger.Trace( fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix), - "fromID", from, + "fromId", fromId, "to", to.Format(time.RFC3339), ) - eventRecords, err := heimdallClient.FetchStateSyncEvents(ctx, from, fetchTo, fetchLimit) + eventRecords, err := heimdallClient.FetchStateSyncEvents(ctx, fromId, fetchTo, fetchLimit) if err != nil { return lastStateSyncEventID, 0, time.Since(fetchStart), err @@ -477,19 +474,36 @@ func fetchAndWriteHeimdallStateSyncEvents( } wroteIndex := false + + var initialRecordTime *time.Time + for i, eventRecord := range eventRecords { if eventRecord.ID <= lastStateSyncEventID { continue } - if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID || !eventRecord.Time.Before(to) { - return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf( - "invalid event record received %s, %s, %s, %s", - fmt.Sprintf("blockNum=%d", blockNum), - fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1), - fmt.Sprintf("chainId=%s (exp %s)", eventRecord.ChainID, chainID), - fmt.Sprintf("time=%s (exp to %s)", eventRecord.Time, to), - ) + // Note: this check is only valid for events with eventRecord.ID > lastStateSyncEventID + var afterCheck = func(limitTime time.Time, eventTime time.Time, initialTime *time.Time) bool { + if initialTime == nil { + return eventTime.After(from) + } + + return initialTime.After(from) + } + + // don't apply this for devnets we may have looser state event constraints + // (TODO these probably needs fixing) + if !(chainID == "1337") { + if lastStateSyncEventID+1 != eventRecord.ID || eventRecord.ChainID != chainID || + !(afterCheck(from, eventRecord.Time, initialRecordTime) && eventRecord.Time.Before(to)) { + return lastStateSyncEventID, i, time.Since(fetchStart), fmt.Errorf( + "invalid event record received %s, %s, %s, %s", + fmt.Sprintf("blockNum=%d", blockNum), + fmt.Sprintf("eventId=%d (exp %d)", eventRecord.ID, lastStateSyncEventID+1), + fmt.Sprintf("chainId=%s (exp %s)", eventRecord.ChainID, chainID), + fmt.Sprintf("time=%s (exp from %s, to %s)", eventRecord.Time, from, to), + ) + } } data, err := eventRecord.MarshallBytes() @@ -514,6 +528,11 @@ func fetchAndWriteHeimdallStateSyncEvents( wroteIndex = true } + if initialRecordTime == nil { + eventTime := eventRecord.Time + initialRecordTime = &eventTime + } + lastStateSyncEventID++ } diff --git a/eth/stagedsync/stage_mining_bor_heimdall.go b/eth/stagedsync/stage_mining_bor_heimdall.go index 25644bbefac..3561dce54cb 100644 --- a/eth/stagedsync/stage_mining_bor_heimdall.go +++ b/eth/stagedsync/stage_mining_bor_heimdall.go @@ -93,6 +93,7 @@ func MiningBorHeimdallForward( logger, lastStateSyncEventID, ) + if err != nil { return err } diff --git a/polygon/bor/bor.go b/polygon/bor/bor.go index 191efcca8b8..29abb547ddb 100644 --- a/polygon/bor/bor.go +++ b/polygon/bor/bor.go @@ -251,6 +251,35 @@ type ValidateHeaderTimeSignerSuccessionNumber interface { GetSignerSuccessionNumber(signer libcommon.Address, number uint64) (int, error) } +func CalculateEventWindow(ctx context.Context, config *borcfg.BorConfig, header *types.Header, tx kv.Getter, headerReader services.HeaderReader) (from time.Time, to time.Time, err error) { + + blockNum := header.Number.Uint64() + blockNum += blockNum % config.CalculateSprintLength(blockNum) + + prevHeader, err := headerReader.HeaderByNumber(ctx, tx, blockNum-config.CalculateSprintLength(blockNum)) + + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("window calculation failed: %w", err) + } + + if config.IsIndore(blockNum) { + stateSyncDelay := config.CalculateStateSyncDelay(blockNum) + to = time.Unix(int64(header.Time-stateSyncDelay), 0) + from = time.Unix(int64(prevHeader.Time-stateSyncDelay), 0) + } else { + to = time.Unix(int64(prevHeader.Time), 0) + prevHeader, err := headerReader.HeaderByNumber(ctx, tx, prevHeader.Number.Uint64()-config.CalculateSprintLength(prevHeader.Number.Uint64())) + + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("window calculation failed: %w", err) + } + + from = time.Unix(int64(prevHeader.Time), 0) + } + + return from, to, nil +} + type spanReader interface { Span(ctx context.Context, id uint64) (*heimdall.Span, bool, error) } @@ -1511,49 +1540,6 @@ func (c *Bor) CommitStates( events := chain.Chain.BorEventsByBlock(header.Hash(), blockNum) - //if len(events) == 50 || len(events) == 0 { // we still sometime could get 0 events from borevent file - if blockNum <= chain.Chain.FrozenBorBlocks() && len(events) == 50 { // we still sometime could get 0 events from borevent file - var to time.Time - if c.config.IsIndore(blockNum) { - stateSyncDelay := c.config.CalculateStateSyncDelay(blockNum) - to = time.Unix(int64(header.Time-stateSyncDelay), 0) - } else { - pHeader := chain.Chain.GetHeaderByNumber(blockNum - c.config.CalculateSprintLength(blockNum)) - to = time.Unix(int64(pHeader.Time), 0) - } - - startEventID := chain.Chain.BorStartEventID(header.Hash(), blockNum) - log.Warn("[dbg] fallback to remote bor events", "blockNum", blockNum, "startEventID", startEventID, "events_from_db_or_snaps", len(events)) - remote, err := c.HeimdallClient.FetchStateSyncEvents(context.Background(), startEventID, to, 0) - if err != nil { - return err - } - if len(remote) > 0 { - chainID := c.chainConfig.ChainID.String() - - var merged []*heimdall.EventRecordWithTime - events = events[:0] - for _, event := range remote { - if event.ChainID != chainID { - continue - } - if event.Time.After(to) { - continue - } - merged = append(merged, event) - } - - for _, ev := range merged { - data, err := ev.MarshallBytes() - if err != nil { - panic(err) - } - - events = append(events, data) - } - } - } - for _, event := range events { if err := c.stateReceiver.CommitState(event, syscall); err != nil { return err diff --git a/polygon/bor/snaptype/types.go b/polygon/bor/snaptype/types.go index cd4316ff2db..bacbf3263ec 100644 --- a/polygon/bor/snaptype/types.go +++ b/polygon/bor/snaptype/types.go @@ -102,6 +102,7 @@ var ( var prevBlockNum uint64 var startEventId uint64 var lastEventId uint64 + if err := kv.BigChunks(db, kv.BorEventNums, from, func(tx kv.Tx, blockNumBytes, eventIdBytes []byte) (bool, error) { blockNum := binary.BigEndian.Uint64(blockNumBytes) if first { @@ -141,15 +142,44 @@ var ( }); err != nil { return 0, err } - if lastEventId > startEventId { - if err := db.View(ctx, func(tx kv.Tx) error { - blockHash, e := rawdb.ReadCanonicalHash(tx, prevBlockNum) - if e != nil { - return e + + if lastEventId > 0 { + // if we have reached the last entry in kv.BorEventNums we will have a start event + // but no end - so we assume that the last event is going to be the last recorded + // event in the db + if startEventId == lastEventId { + if err := db.View(ctx, func(tx kv.Tx) error { + cursor, err := tx.Cursor(kv.BorEvents) + if err != nil { + return err + } + + defer cursor.Close() + k, _, err := cursor.Last() + if err != nil { + return err + } + + if k != nil { + lastEventId = binary.BigEndian.Uint64(k) + } + + return nil + }); err != nil { + return 0, err + } + } + + if lastEventId >= startEventId { + if err := db.View(ctx, func(tx kv.Tx) error { + blockHash, e := rawdb.ReadCanonicalHash(tx, prevBlockNum) + if e != nil { + return e + } + return extractEventRange(startEventId, lastEventId+1, tx, prevBlockNum, blockHash, collect) + }); err != nil { + return 0, err } - return extractEventRange(startEventId, lastEventId+1, tx, prevBlockNum, blockHash, collect) - }); err != nil { - return 0, err } } diff --git a/polygon/heimdall/event_fetch_test.go b/polygon/heimdall/event_fetch_test.go new file mode 100644 index 00000000000..5411ee46629 --- /dev/null +++ b/polygon/heimdall/event_fetch_test.go @@ -0,0 +1,39 @@ +package heimdall + +import ( + "context" + "testing" + "time" + + "github.com/erigontech/erigon-lib/log/v3" +) + +func TestOver50EventBlockFetch(t *testing.T) { + heimdallClient := NewHeimdallClient("https://heimdall-api.polygon.technology/", log.New()) + + // block := 48077376 + // block time := Sep-28-2023 08:13:58 AM + events, err := heimdallClient.FetchStateSyncEvents(context.Background(), 2774290, time.Unix(1695888838-128, 0), 0) + + if err != nil { + t.Fatal(err) + } + + if len(events) != 113 { + t.Fatal("Unexpected event count, exptected: 113, got:", len(events)) + } + + // block := 23893568 + // block time := Jan-19-2022 04:48:04 AM + + events, err = heimdallClient.FetchStateSyncEvents(context.Background(), 1479540, time.Unix(1642567374, 0), 0) + + if err != nil { + t.Fatal(err) + } + + if len(events) != 9417 { + t.Fatal("Unexpected event count, exptected: 9417, got:", len(events)) + } + +} diff --git a/polygon/heimdall/event_record.go b/polygon/heimdall/event_record.go index eba493529ef..ad897f9302b 100644 --- a/polygon/heimdall/event_record.go +++ b/polygon/heimdall/event_record.go @@ -131,3 +131,29 @@ type StateSyncEventResponse struct { Height string `json:"height"` Result EventRecordWithTime `json:"result"` } + +var methodId []byte = borabi.StateReceiverContractABI().Methods["commitState"].ID + +func EventTime(encodedEvent rlp.RawValue) time.Time { + if bytes.Equal(methodId, encodedEvent[0:4]) { + return time.Unix((&big.Int{}).SetBytes(encodedEvent[4:36]).Int64(), 0) + } + + return time.Time{} +} + +var commitStateInputs = borabi.StateReceiverContractABI().Methods["commitState"].Inputs + +func EventId(encodedEvent rlp.RawValue) uint64 { + if bytes.Equal(methodId, encodedEvent[0:4]) { + args, _ := commitStateInputs.Unpack(encodedEvent[4:]) + + if len(args) == 2 { + var eventRecord EventRecord + if err := rlp.DecodeBytes(args[1].([]byte), &eventRecord); err == nil { + return eventRecord.ID + } + } + } + return 0 +} diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index 4e320886736..cee54e89891 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -485,8 +485,9 @@ func doIntegrity(cliCtx *cli.Context) error { defer chainDB.Close() cfg := ethconfig.NewSnapCfg(false, true, true) + from := cliCtx.Uint64(SnapshotFromFlag.Name) - _, _, _, blockRetire, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger) + _, _, _, blockRetire, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) if err != nil { return err } @@ -503,7 +504,7 @@ func doIntegrity(cliCtx *cli.Context) error { return err } case integrity.Blocks: - if err := integrity.SnapBlocksRead(chainDB, blockReader, ctx, failFast); err != nil { + if err := integrity.SnapBlocksRead(ctx, chainDB, blockReader, 0, 0, failFast); err != nil { return err } case integrity.InvertedIndex: @@ -514,6 +515,11 @@ func doIntegrity(cliCtx *cli.Context) error { if err := integrity.E3HistoryNoSystemTxs(ctx, chainDB, agg); err != nil { return err } + case integrity.NoBorEventGaps: + if err := integrity.NoGapsInBorEvents(ctx, chainDB, blockReader, 0, 0, failFast); err != nil { + return err + } + default: return fmt.Errorf("unknown check: %s", chk) } @@ -957,7 +963,9 @@ func doIndicesCommand(cliCtx *cli.Context, dirs datadir.Dirs) error { cfg := ethconfig.NewSnapCfg(false, true, true) chainConfig := fromdb.ChainConfig(chainDB) - _, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger) + from := cliCtx.Uint64(SnapshotFromFlag.Name) + + _, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) if err != nil { return err } @@ -987,7 +995,8 @@ func doLS(cliCtx *cli.Context, dirs datadir.Dirs) error { chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() defer chainDB.Close() cfg := ethconfig.NewSnapCfg(false, true, true) - blockSnaps, borSnaps, caplinSnaps, _, agg, clean, err := openSnaps(ctx, cfg, dirs, chainDB, logger) + from := cliCtx.Uint64(SnapshotFromFlag.Name) + blockSnaps, borSnaps, caplinSnaps, _, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) if err != nil { return err } @@ -1001,7 +1010,7 @@ func doLS(cliCtx *cli.Context, dirs datadir.Dirs) error { return nil } -func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.Dirs, chainDB kv.RwDB, logger log.Logger) ( +func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.Dirs, from uint64, chainDB kv.RwDB, logger log.Logger) ( blockSnaps *freezeblocks.RoSnapshots, borSnaps *freezeblocks.BorRoSnapshots, csn *freezeblocks.CaplinSnapshots, br *freezeblocks.BlockRetire, agg *libstate.Aggregator, clean func(), err error, ) { @@ -1184,7 +1193,8 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error { defer db.Close() cfg := ethconfig.NewSnapCfg(false, true, true) - blockSnaps, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, db, logger) + + blockSnaps, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, db, logger) if err != nil { return err } diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index b461dfad179..b184b5ac45e 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -1488,18 +1488,17 @@ func (r *BlockReader) EventsByIdFromSnapshot(from uint64, to time.Time, limit in } if event.Time.After(to) { maxTime = true - goto BREAK + return result, maxTime, nil } result = append(result, &event) if len(result) == limit { - goto BREAK + return result, maxTime, nil } } } -BREAK: return result, maxTime, nil } @@ -1545,8 +1544,11 @@ func (r *BlockReader) LastFrozenEventId() uint64 { var lastSegment *Segment for i := len(segments) - 1; i >= 0; i-- { if segments[i].Index() != nil { - lastSegment = segments[i] - break + gg := segments[i].MakeGetter() + if gg.HasNext() { + lastSegment = segments[i] + break + } } } if lastSegment == nil { diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index d66160cdf3e..9ff4cd46f8b 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -18,33 +18,34 @@ package freezeblocks import ( "context" + "encoding/binary" "fmt" "os" "path/filepath" "reflect" + "time" "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/common/length" + "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/common/dbg" "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cmd/hack/tool/fromdb" + "github.com/erigontech/erigon/core/types" "github.com/erigontech/erigon/eth/ethconfig" + "github.com/erigontech/erigon/polygon/bor" + "github.com/erigontech/erigon/polygon/bor/borcfg" borsnaptype "github.com/erigontech/erigon/polygon/bor/snaptype" + "github.com/erigontech/erigon/polygon/heimdall" "github.com/erigontech/erigon/turbo/services" ) -var BorProduceFiles = dbg.EnvBool("BOR_PRODUCE_FILES", false) - func (br *BlockRetire) dbHasEnoughDataForBorRetire(ctx context.Context) (bool, error) { return true, nil } func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) { - if !BorProduceFiles { - return false, nil - } - select { case <-ctx.Done(): return false, ctx.Err() @@ -246,6 +247,212 @@ func (s *BorRoSnapshots) ReopenFolder() error { return nil } +func checkBlockEvents(ctx context.Context, config *borcfg.BorConfig, blockReader services.FullBlockReader, + block uint64, prevBlock uint64, eventId uint64, prevBlockStartId uint64, prevEventTime *time.Time, tx kv.Tx, failFast bool) (*time.Time, error) { + header, err := blockReader.HeaderByNumber(ctx, tx, prevBlock) + + if err != nil { + if failFast { + return nil, fmt.Errorf("can't get header for block %d: %w", block, err) + } + + log.Error("[integrity] NoGapsInBorEvents: can't get header for block", "block", block, "err", err) + } + + events, err := blockReader.EventsByBlock(ctx, tx, header.Hash(), header.Number.Uint64()) + + if err != nil { + if failFast { + return nil, fmt.Errorf("can't get events for block %d: %w", block, err) + } + + log.Error("[integrity] NoGapsInBorEvents: can't get events for block", "block", block, "err", err) + } + + if prevBlockStartId != 0 { + if len(events) != int(eventId-prevBlockStartId) { + if failFast { + return nil, fmt.Errorf("block event mismatch at %d: expected: %d, got: %d", block, eventId-prevBlockStartId, len(events)) + } + + log.Error("[integrity] NoGapsInBorEvents: block event count mismatch", "block", block, "eventId", eventId, "expected", eventId-prevBlockStartId, "got", len(events)) + } + } + + var lastBlockEventTime time.Time + var firstBlockEventTime *time.Time + + for i, event := range events { + + var eventId uint64 + + if prevBlockStartId != 0 { + eventId = heimdall.EventId(event) + + if eventId != prevBlockStartId+uint64(i) { + if failFast { + return nil, fmt.Errorf("invalid event id %d for event %d in block %d: expected: %d", eventId, i, block, prevBlockStartId+uint64(i)) + } + + log.Error("[integrity] NoGapsInBorEvents: invalid event id", "block", block, "event", i, "expected", prevBlockStartId+uint64(i), "got", eventId) + } + } else { + eventId = prevBlockStartId + uint64(i) + } + + eventTime := heimdall.EventTime(event) + + //if i != 0 { + // if eventTime.Before(lastBlockEventTime) { + // eventTime = lastBlockEventTime + // } + //} + + if i == 0 { + lastBlockEventTime = eventTime + } + + const warnPrevTimes = false + + if prevEventTime != nil { + if eventTime.Before(*prevEventTime) && warnPrevTimes { + log.Warn("[integrity] NoGapsInBorEvents: event time before prev", "block", block, "event", eventId, "time", eventTime, "prev", *prevEventTime, "diff", -prevEventTime.Sub(eventTime)) + } + } + + prevEventTime = &eventTime + + if !checkBlockWindow(ctx, eventTime, firstBlockEventTime, config, header, tx, blockReader) { + from, to, _ := bor.CalculateEventWindow(ctx, config, header, tx, blockReader) + + var diff time.Duration + + if eventTime.Before(from) { + diff = -from.Sub(eventTime) + } else if eventTime.After(to) { + diff = to.Sub(eventTime) + } + + if failFast { + return nil, fmt.Errorf("invalid time %s for event %d in block %d: expected %s-%s", eventTime, eventId, block, from, to) + } + + log.Error(fmt.Sprintf("[integrity] NoGapsInBorEvents: invalid event time at %d of %d", i, len(events)), "block", block, "event", eventId, "time", eventTime, "diff", diff, "expected", fmt.Sprintf("%s-%s", from, to), "block-start", prevBlockStartId, "first-time", lastBlockEventTime, "timestamps", fmt.Sprintf("%d-%d", from.Unix(), to.Unix())) + } + + if firstBlockEventTime == nil { + firstBlockEventTime = &eventTime + } + } + + return prevEventTime, nil +} + +func ValidateBorEvents(ctx context.Context, config *borcfg.BorConfig, db kv.RoDB, blockReader services.FullBlockReader, eventSegment *Segment, prevEventId uint64, maxBlockNum uint64, failFast bool, logEvery *time.Ticker) (uint64, error) { + g := eventSegment.Decompressor.MakeGetter() + + word := make([]byte, 0, 4096) + + var prevBlock, prevBlockStartId uint64 + var prevEventTime *time.Time + + for g.HasNext() { + word, _ = g.Next(word[:0]) + + block := binary.BigEndian.Uint64(word[length.Hash : length.Hash+length.BlockNum]) + eventId := binary.BigEndian.Uint64(word[length.Hash+length.BlockNum : length.Hash+length.BlockNum+8]) + event := word[length.Hash+length.BlockNum+8:] + + recordId := heimdall.EventId(event) + + if recordId != eventId { + if failFast { + return prevEventId, fmt.Errorf("invalid event id %d in block %d: expected: %d", recordId, block, eventId) + } + + log.Error("[integrity] NoGapsInBorEvents: invalid event id", "block", block, "event", recordId, "expected", eventId) + } + + if prevEventId > 0 { + switch { + case eventId < prevEventId: + if failFast { + return prevEventId, fmt.Errorf("invaid bor event %d (prev=%d) at block=%d", eventId, prevEventId, block) + } + + log.Error("[integrity] NoGapsInBorEvents: invalid bor event", "event", eventId, "prev", prevEventId, "block", block) + + case eventId != prevEventId+1: + if failFast { + return prevEventId, fmt.Errorf("missing bor event %d (prev=%d) at block=%d", eventId, prevEventId, block) + } + + log.Error("[integrity] NoGapsInBorEvents: missing bor event", "event", eventId, "prev", prevEventId, "block", block) + } + } + + //if prevEventId == 0 { + //log.Info("[integrity] checking bor events", "event", eventId, "block", block) + //} + + if prevBlock != 0 && prevBlock != block { + var err error + + if db != nil { + err = db.View(ctx, func(tx kv.Tx) error { + prevEventTime, err = checkBlockEvents(ctx, config, blockReader, block, prevBlock, eventId, prevBlockStartId, prevEventTime, tx, failFast) + return err + }) + } else { + prevEventTime, err = checkBlockEvents(ctx, config, blockReader, block, prevBlock, eventId, prevBlockStartId, prevEventTime, nil, failFast) + } + + if err != nil { + return prevEventId, err + } + + prevBlockStartId = eventId + } + + prevEventId = eventId + prevBlock = block + + var logChan <-chan time.Time + + if logEvery != nil { + logChan = logEvery.C + } + + select { + case <-ctx.Done(): + return prevEventId, ctx.Err() + case <-logChan: + log.Info("[integrity] NoGapsInBorEvents", "blockNum", fmt.Sprintf("%dK/%dK", binary.BigEndian.Uint64(word[length.Hash:length.Hash+length.BlockNum])/1000, maxBlockNum/1000)) + default: + } + } + + return prevEventId, nil +} + +func checkBlockWindow(ctx context.Context, eventTime time.Time, firstBlockEventTime *time.Time, config *borcfg.BorConfig, header *types.Header, tx kv.Getter, headerReader services.HeaderReader) bool { + from, to, err := bor.CalculateEventWindow(ctx, config, header, tx, headerReader) + + if err != nil { + return false + } + + var afterCheck = func(limitTime time.Time, eventTime time.Time, initialTime *time.Time) bool { + if initialTime == nil { + return eventTime.After(from) + } + + return initialTime.After(from) + } + + return !(afterCheck(from, eventTime, firstBlockEventTime) || eventTime.After(to)) +} + type BorView struct { base *View }