From c04f57b7f48c7cdd6b0731ca823ad6ac3c7c6f23 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 18 Sep 2024 14:16:00 +0200 Subject: [PATCH 1/7] commit --- cmd/integration/commands/stages.go | 5 ++-- cmd/state/exec3/state.go | 23 +++++++-------- core/blockchain.go | 11 ++++---- eth/backend.go | 8 +++--- eth/stagedsync/exec3.go | 36 +++++++++++------------- eth/stagedsync/stage_execute.go | 10 +++---- eth/stagedsync/stage_mining_exec.go | 4 +-- eth/stagedsync/stagedsynctest/harness.go | 2 ++ eth/stagedsync/stages/stages.go | 8 ++++++ eth/stagedsync/sync.go | 4 ++- eth/stagedsync/sync_test.go | 22 +++++++-------- turbo/stages/mock/mock_sentry.go | 6 ++-- turbo/stages/stageloop.go | 1 + 13 files changed, 78 insertions(+), 62 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index ec9455b49cf..a1a0888ca83 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1424,9 +1424,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, recents = bor.Recents signatures = bor.Signatures } - stages := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, + stagesDefault := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, heimdallClient, recents, signatures, logger) - sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger) + sync := stagedsync.New(cfg.Sync, stagesDefault, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ApplyingBlocks) //TODO: not completely sure miner := stagedsync.NewMiningState(&cfg.Miner) miningCancel := make(chan struct{}) @@ -1464,6 +1464,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, + stages.BlockProduction, ) return engine, vmConfig, sync, miningSync, miner diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index f0c1b5175cc..30ea8419fcb 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -18,6 +18,7 @@ package exec3 import ( "context" + "github.com/erigontech/erigon/eth/stagedsync/stages" "sync" "golang.org/x/sync/errgroup" @@ -70,10 +71,10 @@ type Worker struct { dirs datadir.Dirs - isMining bool + mode stages.Mode } -func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, isMining bool) *Worker { +func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, mode stages.Mode) *Worker { w := &Worker{ lock: lock, logger: logger, @@ -94,7 +95,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro dirs: dirs, - isMining: isMining, + mode: mode, } w.taskGasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) w.vmCfg = vm.Config{Debug: true, Tracer: w.callTracer} @@ -130,7 +131,7 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) { func (rw *Worker) Run() error { for txTask, ok := rw.in.Next(rw.ctx); ok; txTask, ok = rw.in.Next(rw.ctx) { - rw.RunTxTask(txTask, rw.isMining) + rw.RunTxTask(txTask, rw.mode) if err := rw.resultCh.Add(rw.ctx, txTask); err != nil { return err } @@ -138,10 +139,10 @@ func (rw *Worker) Run() error { return nil } -func (rw *Worker) RunTxTask(txTask *state.TxTask, isMining bool) { +func (rw *Worker) RunTxTask(txTask *state.TxTask, mode stages.Mode) { rw.lock.Lock() defer rw.lock.Unlock() - rw.RunTxTaskNoLock(txTask, isMining) + rw.RunTxTaskNoLock(txTask, mode) } // Needed to set history reader when need to offset few txs from block beginning and does not break processing, @@ -163,7 +164,7 @@ func (rw *Worker) SetReader(reader state.ResettableStateReader) { } } -func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) { +func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) { if txTask.HistoryExecution && !rw.historyMode { // in case if we cancelled execution and commitment happened in the middle of the block, we have to process block // from the beginning until committed txNum and only then disable history mode. @@ -232,7 +233,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) { return core.SysCallContract(contract, data, rw.chainConfig, ibs, header, rw.engine, false /* constCall */) } - if isMining { + if mode == stages.BlockProduction { _, txTask.Txs, txTask.BlockReceipts, err = rw.engine.FinalizeAndAssemble(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, nil, rw.logger) } else { _, _, _, err = rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, rw.logger) @@ -299,7 +300,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) { } } -func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, isMining bool) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { +func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, mode stages.Mode) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { reconWorkers = make([]*Worker, workerCount) resultChSize := workerCount * 8 @@ -310,7 +311,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining) + reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode) reconWorkers[i].ResetState(rs, accumulator) } if background { @@ -337,7 +338,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo //applyWorker.ResetTx(nil) } } - applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining) + applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode) return reconWorkers, applyWorker, rws, clear, wait } diff --git a/core/blockchain.go b/core/blockchain.go index 2033c627067..7ed0499044e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -24,6 +24,7 @@ import ( "cmp" "encoding/json" "fmt" + "github.com/erigontech/erigon/eth/stagedsync/stages" "reflect" "slices" "time" @@ -170,7 +171,7 @@ func ExecuteBlockEphemerally( if !vmConfig.ReadOnly { txs := block.Transactions() - if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, false, logger); err != nil { + if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, stages.ApplyingBlocks, logger); err != nil { //TODO: not sure return nil, err } } @@ -323,14 +324,14 @@ func FinalizeBlockExecution( stateWriter state.StateWriter, cc *chain.Config, ibs *state.IntraBlockState, receipts types.Receipts, withdrawals []*types.Withdrawal, requests types.Requests, chainReader consensus.ChainReader, - isMining bool, + mode stages.Mode, logger log.Logger, ) (newBlock *types.Block, newTxs types.Transactions, newReceipt types.Receipts, err error) { syscall := func(contract libcommon.Address, data []byte) ([]byte, error) { return SysCallContract(contract, data, cc, ibs, header, engine, false /* constCall */) } - if isMining { + if mode == stages.BlockProduction { newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, nil, logger) } else { var rss types.Requests @@ -367,7 +368,7 @@ func InitializeBlockExecution(engine consensus.Engine, chain consensus.ChainHead return nil } -func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, isMining bool) error { +func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, mode stages.Mode) error { if gasUsed != h.GasUsed { return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x", gasUsed, h.GasUsed, h.Number.Uint64(), h.Hash()) @@ -383,7 +384,7 @@ func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receip } receiptHash := types.DeriveSha(receipts) if receiptHash != h.ReceiptHash { - if isMining { + if mode == stages.BlockProduction { h.ReceiptHash = receiptHash return nil } diff --git a/eth/backend.go b/eth/backend.go index 199e825baf8..6ef85dc3112 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -696,7 +696,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader), stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore), ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger) + logger, stages.BlockProduction) var ethashApi *ethash.API if casted, ok := backend.engine.(*ethash.Ethash); ok { @@ -736,7 +736,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger ), stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd), stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader), - stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger) + stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.BlockProduction) // We start the mining step if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil { return nil, err @@ -887,7 +887,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.syncPruneOrder = stagedsync.DefaultPruneOrder } - backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger) + backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger, stages.ApplyingBlocks) hook := stages2.NewHook(backend.sentryCtx, backend.chainDB, backend.notifications, backend.stagedSync, backend.blockReader, backend.chainConfig, backend.logger, backend.sentriesClient.SetStatus) @@ -915,7 +915,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger checkStateRoot := true pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot) - backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) + backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks) //TODO: not sure backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx) executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index bd67d9374de..c19d813f479 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -187,7 +187,6 @@ func ExecV3(ctx context.Context, maxBlockNum uint64, logger log.Logger, initialCycle bool, - isMining bool, ) error { // TODO: e35 doesn't support parallel-exec yet parallel = false //nolint @@ -225,9 +224,8 @@ func ExecV3(ctx context.Context, pruneNonEssentials := cfg.prune.History.Enabled() && cfg.prune.History.PruneTo(execStage.BlockNumber) == execStage.BlockNumber var err error - inMemExec := txc.Doms != nil var doms *state2.SharedDomains - if inMemExec { + if execStage.state.mode == stages.ForkValidation { doms = txc.Doms } else { var err error @@ -377,10 +375,10 @@ func ExecV3(ctx context.Context, rwsConsumed := make(chan struct{}, 1) defer close(rwsConsumed) - execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, isMining) + execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, execStage.state.mode) defer stopWorkers() applyWorker := cfg.applyWorker - if isMining { + if execStage.state.mode == stages.BlockProduction { applyWorker = cfg.applyWorkerMining } applyWorker.ResetState(rs, accumulator) @@ -421,7 +419,7 @@ func ExecV3(ctx context.Context, return err } - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, isMining) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, execStage.state.mode) if err != nil { return err } @@ -499,14 +497,14 @@ func ExecV3(ctx context.Context, return err } ac.Close() - if !inMemExec { + if execStage.state.mode != stages.ForkValidation { if err = doms.Flush(ctx, tx); err != nil { return err } } break } - if inMemExec { + if execStage.state.mode == stages.ForkValidation { break } @@ -522,7 +520,7 @@ func ExecV3(ctx context.Context, rws.DrainNonBlocking() applyWorker.ResetTx(tx) - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, isMining) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, execStage.state.mode) if err != nil { return err } @@ -833,7 +831,7 @@ Loop: if txTask.Error != nil { break Loop } - applyWorker.RunTxTaskNoLock(txTask, isMining) + applyWorker.RunTxTaskNoLock(txTask, execStage.state.mode) if err := func() error { if errors.Is(txTask.Error, context.Canceled) { return err @@ -854,7 +852,7 @@ Loop: if txTask.Final { checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec - if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, isMining); err != nil { + if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, execStage.state.mode); err != nil { return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go } } @@ -911,7 +909,7 @@ Loop: ts += time.Since(start) aggTx.RestrictSubsetFileDeletions(false) doms.SavePastChangesetAccumulator(b.Hash(), blockNum, changeset) - if !inMemExec { + if execStage.state.mode != stages.ForkValidation { if err := state2.WriteDiffSet(applyTx, blockNum, b.Hash(), changeset); err != nil { return err } @@ -942,7 +940,7 @@ Loop: //} // If we skip post evaluation, then we should compute root hash ASAP for fail-fast aggregatorRo := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx) - if (!skipPostEvaluation && rs.SizeEstimate() < commitThreshold && !aggregatorRo.CanPrune(applyTx, outputTxNum.Load())) || inMemExec { + if (!skipPostEvaluation && rs.SizeEstimate() < commitThreshold && !aggregatorRo.CanPrune(applyTx, outputTxNum.Load())) || execStage.state.mode == stages.ForkValidation { break } var ( @@ -952,7 +950,7 @@ Loop: t1, t2, t3 time.Duration ) - if ok, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, inMemExec); err != nil { + if ok, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.state.mode); err != nil { return err } else if !ok { break Loop @@ -1038,7 +1036,7 @@ Loop: if u != nil && !u.HasUnwindPoint() { if b != nil { - _, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, inMemExec) + _, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.state.mode) if err != nil { return err } @@ -1112,7 +1110,7 @@ func dumpPlainStateDebug(tx kv.RwTx, doms *state2.SharedDomains) { } // flushAndCheckCommitmentV3 - does write state to db and then check commitment -func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyTx kv.RwTx, doms *state2.SharedDomains, cfg ExecuteBlockCfg, e *StageState, maxBlockNum uint64, parallel bool, logger log.Logger, u Unwinder, inMemExec bool) (bool, error) { +func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyTx kv.RwTx, doms *state2.SharedDomains, cfg ExecuteBlockCfg, e *StageState, maxBlockNum uint64, parallel bool, logger log.Logger, u Unwinder, mode stages.Mode) (bool, error) { // E2 state root check was in another stage - means we did flush state even if state root will not match // And Unwind expecting it @@ -1145,7 +1143,7 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT return true, nil } if bytes.Equal(rh, header.Root.Bytes()) { - if !inMemExec { + if mode != stages.ForkValidation { if err := doms.Flush(ctx, applyTx); err != nil { return false, err } @@ -1216,7 +1214,7 @@ func blockWithSenders(ctx context.Context, db kv.RoDB, tx kv.Tx, blockReader ser return b, err } -func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *state.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool, isMining bool) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { +func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *state.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool, mode stages.Mode) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { rwsIt := rws.Iter() defer rwsIt.Close() @@ -1234,7 +1232,7 @@ func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *stat } // resolve first conflict right here: it's faster and conflict-free - applyWorker.RunTxTask(txTask, isMining) + applyWorker.RunTxTask(txTask, mode) if txTask.Error != nil { return outputTxNum, conflicts, triggers, processedBlockNum, false, fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, txTask.Error) } diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index b6837e5d83f..570413a752d 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -129,14 +129,14 @@ func StageExecuteBlocksCfg( historyV3: true, syncCfg: syncCfg, silkworm: silkworm, - applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, false), - applyWorkerMining: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, true), + applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, stages.ApplyingBlocks), + applyWorkerMining: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, stages.BlockProduction), } } // ================ Erigon3 ================ -func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger, isMining bool) (err error) { +func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger) (err error) { workersCount := cfg.syncCfg.ExecWorkerCount if !initialCycle { workersCount = 1 @@ -156,7 +156,7 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 } parallel := txc.Tx == nil - if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle, isMining); err != nil { + if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle); err != nil { return err } return nil @@ -247,7 +247,7 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, to if dbg.StagesOnlyBlocks { return nil } - if err = ExecBlockV3(s, u, txc, toBlock, ctx, cfg, s.CurrentSyncCycle.IsInitialCycle, logger, false); err != nil { + if err = ExecBlockV3(s, u, txc, toBlock, ctx, cfg, s.CurrentSyncCycle.IsInitialCycle, logger); err != nil { return err } return nil diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 381bca90880..c2e2efc5dbc 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -201,7 +201,7 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg var err error var block *types.Block - block, current.Txs, current.Receipts, err = core.FinalizeBlockExecution(cfg.engine, stateReader, current.Header, current.Txs, current.Uncles, &state.NoopWriter{}, &cfg.chainConfig, ibs, current.Receipts, current.Withdrawals, current.Requests, chainReader, true, logger) + block, current.Txs, current.Receipts, err = core.FinalizeBlockExecution(cfg.engine, stateReader, current.Header, current.Txs, current.Uncles, &state.NoopWriter{}, &cfg.chainConfig, ibs, current.Receipts, current.Withdrawals, current.Requests, chainReader, s.state.mode, logger) if err != nil { return fmt.Errorf("cannot finalize block execution: %s", err) } @@ -239,7 +239,7 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg // This flag will skip checking the state root execCfg.blockProduction = true execS := &StageState{state: s.state, ID: stages.Execution, BlockNumber: blockHeight - 1} - if err = ExecBlockV3(execS, u, txc, blockHeight, context.Background(), execCfg, false, logger, true); err != nil { + if err = ExecBlockV3(execS, u, txc, blockHeight, context.Background(), execCfg, false, logger); err != nil { logger.Error("cannot execute block execution", "err", err) return err } diff --git a/eth/stagedsync/stagedsynctest/harness.go b/eth/stagedsync/stagedsynctest/harness.go index 71803cf0b32..6bcaec8a82d 100644 --- a/eth/stagedsync/stagedsynctest/harness.go +++ b/eth/stagedsync/stagedsynctest/harness.go @@ -86,6 +86,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, + stages.ApplyingBlocks, ) miningSyncStages := stagedsync.MiningStages( ctx, @@ -102,6 +103,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, + stages.BlockProduction, ) validatorKey, err := crypto.GenerateKey() require.NoError(t, err) diff --git a/eth/stagedsync/stages/stages.go b/eth/stagedsync/stages/stages.go index a6d70c5b8b2..4183fb19c04 100644 --- a/eth/stagedsync/stages/stages.go +++ b/eth/stagedsync/stages/stages.go @@ -115,3 +115,11 @@ func encodeBigEndian(n uint64) []byte { binary.BigEndian.PutUint64(v[:], n) return v[:] } + +type Mode int + +const ( + BlockProduction = Mode(iota) + ForkValidation + ApplyingBlocks +) diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index 5f3b20ce3ed..0bec6521ea0 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -49,6 +49,7 @@ type Sync struct { logPrefixes []string logger log.Logger stagesIdsList []string + mode stages.Mode } type Timing struct { @@ -207,7 +208,7 @@ func (s *Sync) SetCurrentStage(id stages.SyncStage) error { return fmt.Errorf("stage not found with id: %v", id) } -func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger) *Sync { +func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger, mode stages.Mode) *Sync { unwindStages := make([]*Stage, len(stagesList)) for i, stageIndex := range unwindOrder { for _, s := range stagesList { @@ -243,6 +244,7 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune logPrefixes: logPrefixes, logger: logger, stagesIdsList: stagesIdsList, + mode: mode, } } diff --git a/eth/stagedsync/sync_test.go b/eth/stagedsync/sync_test.go index ed9c8db253a..f86c4c3f223 100644 --- a/eth/stagedsync/sync_test.go +++ b/eth/stagedsync/sync_test.go @@ -59,7 +59,7 @@ func TestStagesSuccess(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -99,7 +99,7 @@ func TestDisabledStages(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -139,7 +139,7 @@ func TestErroredStage(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err) @@ -207,7 +207,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -285,7 +285,7 @@ func TestUnwind(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -374,7 +374,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -430,12 +430,12 @@ func TestSyncDoTwice(t *testing.T) { }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) - state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -488,14 +488,14 @@ func TestStateSyncInterruptRestart(t *testing.T) { }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err) expectedErr = nil - state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -567,7 +567,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Error(t, errInterrupted, err) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index fa759d2cfae..0389b54d13a 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -507,7 +507,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, dirs.Tmp, nil, 0, mock.TxPool, nil, mock.BlockReader), stagedsync.StageMiningFinishCfg(mock.DB, *mock.ChainConfig, mock.Engine, miner, miningCancel, mock.BlockReader, latestBlockBuiltStore), ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger) + logger, stages.BlockProduction) // We start the mining step if err := stages2.MiningStep(ctx, mock.DB, proposingSync, tmpdir, logger); err != nil { return nil, err @@ -543,12 +543,13 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, + stages.ApplyingBlocks, ) cfg.Genesis = gspec pipelineStages := stages2.NewPipelineStages(mock.Ctx, db, &cfg, p2p.Config{}, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, mock.agg, nil, forkValidator, logger, checkStateRoot) - mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) + mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks) mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx) @@ -583,6 +584,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, + stages.BlockProduction, ) cfg.Genesis = gspec diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 314c6c98179..332b5bd49f2 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -679,6 +679,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config stagedsync.StateUnwindOrder, nil, /* pruneOrder */ logger, + stages.ForkValidation, ) } From 9aef7610ad1f68d360928ebebc02157e6078fe5f Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 18 Sep 2024 14:28:17 +0200 Subject: [PATCH 2/7] commit --- eth/stagedsync/exec3.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 766d1c0dd4e..d1179f605b7 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -874,10 +874,10 @@ Loop: blobGasUsed += txTask.Tx.GetBlobGas() } if txTask.Final { - if !isMining && execStage.state.mode != stages.ForkValidation && !execStage.CurrentSyncCycle.IsInitialCycle { + if execStage.state.mode == stages.ApplyingBlocks && !execStage.CurrentSyncCycle.IsInitialCycle { cfg.notifications.RecentLogs.Add(receipts) } - checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && !isMining + checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && execStage.state.mode != stages.BlockProduction if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, execStage.state.mode); err != nil { return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go From 6b42bf82379fb6adca5622acdc80ad86fe199cb4 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 18 Sep 2024 15:59:45 +0200 Subject: [PATCH 3/7] commit --- eth/stagedsync/exec3.go | 32 ++++++++++++++++---------------- eth/stagedsync/stage.go | 1 + eth/stagedsync/sync.go | 6 +++--- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index d1179f605b7..70fbd72f0cd 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -226,7 +226,7 @@ func ExecV3(ctx context.Context, var err error var doms *state2.SharedDomains - if execStage.state.mode == stages.ForkValidation { + if execStage.CurrentSyncCycle.Mode == stages.ForkValidation { doms = txc.Doms } else { var err error @@ -378,10 +378,10 @@ func ExecV3(ctx context.Context, rwsConsumed := make(chan struct{}, 1) defer close(rwsConsumed) - execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, execStage.state.mode) + execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, execStage.CurrentSyncCycle.Mode) defer stopWorkers() applyWorker := cfg.applyWorker - if execStage.state.mode == stages.BlockProduction { + if execStage.CurrentSyncCycle.Mode == stages.BlockProduction { applyWorker = cfg.applyWorkerMining } applyWorker.ResetState(rs, accumulator) @@ -422,7 +422,7 @@ func ExecV3(ctx context.Context, return err } - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, execStage.state.mode) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, execStage.CurrentSyncCycle.Mode) if err != nil { return err } @@ -500,14 +500,14 @@ func ExecV3(ctx context.Context, return err } ac.Close() - if execStage.state.mode != stages.ForkValidation { + if execStage.CurrentSyncCycle.Mode != stages.ForkValidation { if err = doms.Flush(ctx, tx); err != nil { return err } } break } - if execStage.state.mode == stages.ForkValidation { + if execStage.CurrentSyncCycle.Mode == stages.ForkValidation { break } @@ -523,7 +523,7 @@ func ExecV3(ctx context.Context, rws.DrainNonBlocking() applyWorker.ResetTx(tx) - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, execStage.state.mode) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, execStage.CurrentSyncCycle.Mode) if err != nil { return err } @@ -855,7 +855,7 @@ Loop: if txTask.Error != nil { break Loop } - applyWorker.RunTxTaskNoLock(txTask, execStage.state.mode) + applyWorker.RunTxTaskNoLock(txTask, execStage.CurrentSyncCycle.Mode) if err := func() error { if errors.Is(txTask.Error, context.Canceled) { return err @@ -874,12 +874,12 @@ Loop: blobGasUsed += txTask.Tx.GetBlobGas() } if txTask.Final { - if execStage.state.mode == stages.ApplyingBlocks && !execStage.CurrentSyncCycle.IsInitialCycle { + if execStage.CurrentSyncCycle.Mode == stages.ApplyingBlocks && !execStage.CurrentSyncCycle.IsInitialCycle { cfg.notifications.RecentLogs.Add(receipts) } - checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && execStage.state.mode != stages.BlockProduction + checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && execStage.CurrentSyncCycle.Mode != stages.BlockProduction if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec - if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, execStage.state.mode); err != nil { + if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, execStage.CurrentSyncCycle.Mode); err != nil { return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go } } @@ -936,7 +936,7 @@ Loop: ts += time.Since(start) aggTx.RestrictSubsetFileDeletions(false) doms.SavePastChangesetAccumulator(b.Hash(), blockNum, changeset) - if execStage.state.mode != stages.ForkValidation { + if execStage.CurrentSyncCycle.Mode != stages.ForkValidation { if err := state2.WriteDiffSet(applyTx, blockNum, b.Hash(), changeset); err != nil { return err } @@ -951,7 +951,7 @@ Loop: // MA commitTx if !parallel { - if execStage.state.mode == stages.ApplyingBlocks { + if execStage.CurrentSyncCycle.Mode == stages.ApplyingBlocks { metrics2.UpdateBlockConsumerPostExecutionDelay(b.Time(), blockNum, logger) } @@ -959,7 +959,7 @@ Loop: select { case <-logEvery.C: - if execStage.state.mode == stages.ForkValidation || execStage.state.mode == stages.BlockProduction { + if execStage.CurrentSyncCycle.Mode == stages.ForkValidation || execStage.CurrentSyncCycle.Mode == stages.BlockProduction { break } @@ -990,7 +990,7 @@ Loop: t1, t2, t3 time.Duration ) - if ok, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.state.mode); err != nil { + if ok, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.CurrentSyncCycle.Mode); err != nil { return err } else if !ok { break Loop @@ -1076,7 +1076,7 @@ Loop: if u != nil && !u.HasUnwindPoint() { if b != nil { - _, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.state.mode) + _, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.CurrentSyncCycle.Mode) if err != nil { return err } diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index ce651474132..7f88ec867ab 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -59,6 +59,7 @@ type Stage struct { type CurrentSyncCycleInfo struct { IsInitialCycle bool // means: not-on-chain-tip. can be several sync cycle in this mode. IsFirstCycle bool // means: first cycle + Mode stages.Mode } // StageState is the state of the stage. diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index 0bec6521ea0..aa9985145f6 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -78,7 +78,7 @@ func (s *Sync) PrevUnwindPoint() *uint64 { } func (s *Sync) NewUnwindState(id stages.SyncStage, unwindPoint, currentProgress uint64, initialCycle, firstCycle bool) *UnwindState { - return &UnwindState{id, unwindPoint, currentProgress, UnwindReason{nil, nil}, s, CurrentSyncCycleInfo{initialCycle, firstCycle}} + return &UnwindState{id, unwindPoint, currentProgress, UnwindReason{nil, nil}, s, CurrentSyncCycleInfo{initialCycle, firstCycle, s.mode}} } // Get the current prune status from the DB @@ -103,7 +103,7 @@ func (s *Sync) PruneStageState(id stages.SyncStage, forwardProgress uint64, tx k } } - return &PruneState{id, forwardProgress, pruneProgress, s, CurrentSyncCycleInfo{initialCycle, false}}, nil + return &PruneState{id, forwardProgress, pruneProgress, s, CurrentSyncCycleInfo{initialCycle, false, s.mode}}, nil } func (s *Sync) NextStage() { @@ -269,7 +269,7 @@ func (s *Sync) StageState(stage stages.SyncStage, tx kv.Tx, db kv.RoDB, initialC } } - return &StageState{s, stage, blockNum, CurrentSyncCycleInfo{initialCycle, firstCycle}}, nil + return &StageState{s, stage, blockNum, CurrentSyncCycleInfo{initialCycle, firstCycle, s.mode}}, nil } func (s *Sync) RunUnwind(db kv.RwDB, txc wrap.TxContainer) error { From 355ba598e17e6a84a7ec9776f0dd24a3bd056af3 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Wed, 18 Sep 2024 16:50:09 +0200 Subject: [PATCH 4/7] commit --- cmd/integration/commands/stages.go | 2 +- core/blockchain.go | 2 +- eth/backend.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 559386aa99e..21fa690abc9 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1485,7 +1485,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, } stagesDefault := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, heimdallClient, recents, signatures, logger) - sync := stagedsync.New(cfg.Sync, stagesDefault, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ApplyingBlocks) //TODO: not completely sure + sync := stagedsync.New(cfg.Sync, stagesDefault, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ApplyingBlocks) miner := stagedsync.NewMiningState(&cfg.Miner) miningCancel := make(chan struct{}) diff --git a/core/blockchain.go b/core/blockchain.go index 36783bda98f..4d996670fab 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -171,7 +171,7 @@ func ExecuteBlockEphemerally( if !vmConfig.ReadOnly { txs := block.Transactions() - if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, stages.ApplyingBlocks, logger); err != nil { //TODO: not sure + if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, stages.ApplyingBlocks, logger); err != nil { return nil, err } } diff --git a/eth/backend.go b/eth/backend.go index c97298aebe5..a8154b09de1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -944,7 +944,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger checkStateRoot := true pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot) - backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks) //TODO: not sure + backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks) backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx) executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer) From b8262bde568962f1da001976c59b727aaf3be7b7 Mon Sep 17 00:00:00 2001 From: JkLondon Date: Fri, 27 Sep 2024 15:41:37 +0200 Subject: [PATCH 5/7] commit --- eth/stagedsync/exec3.go | 2 +- turbo/execution/eth1/forkchoice.go | 3 +++ turbo/execution/eth1/inserters.go | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 59f6a814d60..5746883cdd6 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -361,7 +361,7 @@ func ExecV3(ctx context.Context, var count uint64 var lock sync.RWMutex - shouldReportToTxPool := cfg.notifications != nil && !isMining && maxBlockNum <= blockNum+64 + shouldReportToTxPool := cfg.notifications != nil && execStage.CurrentSyncCycle.Mode != stages.BlockProduction && maxBlockNum <= blockNum+64 var accumulator *shards.Accumulator if shouldReportToTxPool { accumulator = cfg.notifications.Accumulator diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index d24203e57f9..af63f1aee27 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -187,6 +187,9 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original if err != nil { return err } + if num == nil { + return errors.New("block number is nil") + } return rawdb.WriteLastNewBlockSeen(tx, *num) }); err != nil { sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, false) diff --git a/turbo/execution/eth1/inserters.go b/turbo/execution/eth1/inserters.go index c2900a9e475..680bd587418 100644 --- a/turbo/execution/eth1/inserters.go +++ b/turbo/execution/eth1/inserters.go @@ -86,7 +86,7 @@ func (e *EthereumExecutionModule) InsertBlocks(ctx context.Context, req *executi // Parent's total difficulty parentTd, err = rawdb.ReadTd(tx, header.ParentHash, height-1) if err != nil || parentTd == nil { - return nil, fmt.Errorf("parent's total difficulty not found with hash %x and height %d: %v", header.ParentHash, height-1, err) + return nil, fmt.Errorf("ethereumExecutionModule.InsertBlocks: parent's total difficulty not found with hash %x and height %d: %v", header.ParentHash, height-1, err) } } else { parentTd = big.NewInt(0) From 409a9a4344a4d0518a51dad80f1d0f6471b754fb Mon Sep 17 00:00:00 2001 From: JkLondon Date: Sat, 28 Sep 2024 13:30:39 +0200 Subject: [PATCH 6/7] commit --- core/state_transition.go | 2 ++ eth/stagedsync/exec3.go | 3 +++ 2 files changed, 5 insertions(+) diff --git a/core/state_transition.go b/core/state_transition.go index 0ebfc799d30..2b7740d95bd 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -428,6 +428,7 @@ func (st *StateTransition) TransitionDb(refunds bool, gasBailout bool) (*evmtype if err != nil { return nil, err } + println("gas used intr", gas) if st.gasRemaining < gas { return nil, fmt.Errorf("%w: have %d, want %d", ErrIntrinsicGas, st.gasRemaining, gas) } @@ -464,6 +465,7 @@ func (st *StateTransition) TransitionDb(refunds bool, gasBailout bool) (*evmtype } else { ret, st.gasRemaining, vmerr = st.evm.Call(sender, st.to(), st.data, st.gasRemaining, st.value, bailout) } + println("after call", st.gasUsed()) if refunds && !gasBailout { if rules.IsLondon { // After EIP-3529: refunds are capped to gasUsed / 5 diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 5746883cdd6..c35a0096cd6 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -807,6 +807,7 @@ Loop: Config: chainConfig, } if txTask.HistoryExecution && usedGas == 0 { + println("assigned 810") usedGas, blobGasUsed, _, err = rawtemporaldb.ReceiptAsOf(applyTx.(kv.TemporalTx), txTask.TxNum) if err != nil { return err @@ -872,6 +873,7 @@ Loop: txCount++ usedGas += txTask.UsedGas + println("assigned 877", txTask.UsedGas, txIndex, blockNum, usedGas, txTask.Header.GasUsed) logGas += txTask.UsedGas mxExecGas.Add(float64(txTask.UsedGas)) mxExecTransactions.Add(1) @@ -889,6 +891,7 @@ Loop: checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && execStage.CurrentSyncCycle.Mode != stages.BlockProduction if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, txTask.BlockReceipts, txTask.Header, execStage.CurrentSyncCycle.Mode); err != nil { + println("error 893", "mode:", execStage.CurrentSyncCycle.Mode, "txIndex", txIndex, "gas", usedGas) return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go } } From 208f09b95aa09d91538d0d8c230993ebc65543af Mon Sep 17 00:00:00 2001 From: JkLondon Date: Sat, 5 Oct 2024 16:36:06 +0200 Subject: [PATCH 7/7] commit --- cmd/integration/commands/stages.go | 3 +++ cmd/state/exec3/state.go | 1 - core/genesis_write.go | 2 ++ core/state_transition.go | 2 -- erigon-lib/kv/mdbx/kv_mdbx.go | 4 ++-- erigon-lib/state/domain.go | 2 ++ erigon-lib/state/domain_shared.go | 4 ++++ eth/stagedsync/exec3.go | 8 ++++++-- 8 files changed, 19 insertions(+), 7 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index eb9a6f333c5..d0b22472744 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1008,6 +1008,7 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error { defer sn.Close() defer borSn.Close() defer agg.Close() + println("stageExec", agg.OpenFolder()) if warmup { return reset2.WarmupExec(ctx, db) } @@ -1091,6 +1092,8 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error { return nil } + println("stageExec1", agg.OpenFolder()) + err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, block, ctx, cfg, logger) if err != nil { return err diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index 7dddb470449..3f0f020c30a 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -232,7 +232,6 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) { syscall := func(contract libcommon.Address, data []byte) ([]byte, error) { return core.SysCallContract(contract, data, rw.chainConfig, ibs, header, rw.engine, false /* constCall */) } - if mode == stages.BlockProduction { _, txTask.Txs, txTask.BlockReceipts, err = rw.engine.FinalizeAndAssemble(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, nil, rw.logger) } else { diff --git a/core/genesis_write.go b/core/genesis_write.go index b28acaace86..377443229bc 100644 --- a/core/genesis_write.go +++ b/core/genesis_write.go @@ -26,6 +26,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/erigontech/erigon-lib/common/dbg" "math/big" "slices" @@ -426,6 +427,7 @@ func DeveloperGenesisBlock(period uint64, faucet libcommon.Address) *types.Genes // ToBlock creates the genesis block and writes state of a genesis specification // to the given database (or discards it if nil). func GenesisToBlock(g *types.Genesis, dirs datadir.Dirs, logger log.Logger) (*types.Block, *state.IntraBlockState, error) { + println("in genesis", dbg.Stack()) if dirs.SnapDomain == "" { panic("empty `dirs` variable") } diff --git a/core/state_transition.go b/core/state_transition.go index 2b7740d95bd..0ebfc799d30 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -428,7 +428,6 @@ func (st *StateTransition) TransitionDb(refunds bool, gasBailout bool) (*evmtype if err != nil { return nil, err } - println("gas used intr", gas) if st.gasRemaining < gas { return nil, fmt.Errorf("%w: have %d, want %d", ErrIntrinsicGas, st.gasRemaining, gas) } @@ -465,7 +464,6 @@ func (st *StateTransition) TransitionDb(refunds bool, gasBailout bool) (*evmtype } else { ret, st.gasRemaining, vmerr = st.evm.Call(sender, st.to(), st.data, st.gasRemaining, st.value, bailout) } - println("after call", st.gasUsed()) if refunds && !gasBailout { if rules.IsLondon { // After EIP-3529: refunds are capped to gasUsed / 5 diff --git a/erigon-lib/kv/mdbx/kv_mdbx.go b/erigon-lib/kv/mdbx/kv_mdbx.go index 4bdfcd87c4d..f247e9e862f 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx.go +++ b/erigon-lib/kv/mdbx/kv_mdbx.go @@ -375,9 +375,9 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) { opts.pageSize = uint64(in.PageSize) opts.mapSize = datasize.ByteSize(in.MapSize) if opts.label == kv.ChainDB { - opts.log.Info("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize) + opts.log.Info("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize, "stack", dbg.Stack()) } else { - opts.log.Debug("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize) + opts.log.Debug("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize, "stack", dbg.Stack()) } dirtyPagesLimit, err := env.GetOption(mdbx.OptTxnDpLimit) diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index 7357753f27f..bb24194c41d 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -1417,6 +1417,7 @@ var ( ) func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileStartTxNum uint64, fileEndTxNum uint64, err error) { + //println("start gff", dt.name, len(dt.files), len(dt.visible.files)) if len(dt.files) == 0 { return } @@ -1448,6 +1449,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt if traceGetLatest == dt.name { fmt.Printf("GetLatest(%s, %x) -> existence index %s -> false\n", dt.d.filenameBase, filekey, dt.files[i].src.existence.FileName) } + //println("gFF cont", dt.name, string(v), found, i, dt.visible.files[i].src.decompressor.FileName()) continue } else { if traceGetLatest == dt.name { diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index 1aad6334bd6..3f6806183d0 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -244,6 +244,7 @@ func (sd *SharedDomains) rebuildCommitment(ctx context.Context, roTx kv.Tx, bloc // SeekCommitment lookups latest available commitment and sets it as current func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromBlockBeginning uint64, err error) { bn, txn, ok, err := sd.sdCtx.SeekCommitment(tx, sd.aggTx.d[kv.CommitmentDomain], 0, math.MaxUint64) + println("seek", bn, txn, ok, fmt.Sprintf("%+v %+v", sd.aggTx.d[kv.CommitmentDomain].d.dirtyFiles.Items(), sd.aggTx.d[kv.CommitmentDomain].visible.files)) if err != nil { return 0, err } @@ -269,11 +270,13 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB if len(bnBytes) == 8 { bn = binary.BigEndian.Uint64(bnBytes) txn, err = rawdbv3.TxNums.Max(tx, bn) + println("seek here") if err != nil { return 0, err } } if bn == 0 && txn == 0 { + println("zashel?") sd.SetBlockNum(0) sd.SetTxNum(0) return 0, nil @@ -285,6 +288,7 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB return 0, err } if bytes.Equal(newRh, commitment.EmptyRootHash) { + println("here 291") sd.SetBlockNum(0) sd.SetTxNum(0) return 0, nil diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index c35a0096cd6..bc93aafd316 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -224,6 +224,7 @@ func ExecV3(ctx context.Context, pruneNonEssentials := cfg.prune.History.Enabled() && cfg.prune.History.PruneTo(execStage.BlockNumber) == execStage.BlockNumber + println("stageExec21 ", agg.OpenFolder()) var err error var doms *state2.SharedDomains if execStage.CurrentSyncCycle.Mode == stages.ForkValidation { @@ -242,6 +243,7 @@ func ExecV3(ctx context.Context, defer doms.Close() } txNumInDB := doms.TxNum() + println("txnumind", doms.TxNum()) txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, cfg.blockReader)) @@ -273,6 +275,7 @@ func ExecV3(ctx context.Context, return err } ok, _blockNum, err := txNumsReader.FindBlockNum(applyTx, doms.TxNum()) + println("blocknum", _blockNum, "domsTxNum", doms.TxNum()) if err != nil { return err } @@ -339,6 +342,7 @@ func ExecV3(ctx context.Context, ts := time.Duration(0) blockNum = doms.BlockNum() outputTxNum.Store(doms.TxNum()) + println("stageExec22 ", agg.OpenFolder()) if maxBlockNum < blockNum { return nil @@ -678,6 +682,8 @@ func ExecV3(ctx context.Context, var b *types.Block + println("stageExec3", agg.OpenFolder()) + // Only needed by bor chains shouldGenerateChangesetsForLastBlocks := cfg.chainConfig.Bor != nil @@ -873,7 +879,6 @@ Loop: txCount++ usedGas += txTask.UsedGas - println("assigned 877", txTask.UsedGas, txIndex, blockNum, usedGas, txTask.Header.GasUsed) logGas += txTask.UsedGas mxExecGas.Add(float64(txTask.UsedGas)) mxExecTransactions.Add(1) @@ -891,7 +896,6 @@ Loop: checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && execStage.CurrentSyncCycle.Mode != stages.BlockProduction if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, txTask.BlockReceipts, txTask.Header, execStage.CurrentSyncCycle.Mode); err != nil { - println("error 893", "mode:", execStage.CurrentSyncCycle.Mode, "txIndex", txIndex, "gas", usedGas) return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go } }