diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index adddee467e0..d0b22472744 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1008,6 +1008,7 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error { defer sn.Close() defer borSn.Close() defer agg.Close() + println("stageExec", agg.OpenFolder()) if warmup { return reset2.WarmupExec(ctx, db) } @@ -1091,6 +1092,8 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error { return nil } + println("stageExec1", agg.OpenFolder()) + err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, block, ctx, cfg, logger) if err != nil { return err @@ -1430,9 +1433,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, recents = bor.Recents signatures = bor.Signatures } - stages := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, + stagesDefault := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, heimdallClient, recents, signatures, logger) - sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger) + sync := stagedsync.New(cfg.Sync, stagesDefault, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ApplyingBlocks) miner := stagedsync.NewMiningState(&cfg.Miner) miningCancel := make(chan struct{}) @@ -1471,6 +1474,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, + stages.BlockProduction, ) return engine, vmConfig, sync, miningSync, miner diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index db7a8272a6d..3f0f020c30a 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -18,6 +18,7 @@ package exec3 import ( "context" + "github.com/erigontech/erigon/eth/stagedsync/stages" "sync" "golang.org/x/sync/errgroup" @@ -70,10 +71,10 @@ type Worker struct { dirs datadir.Dirs - isMining bool + mode stages.Mode } -func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, isMining bool) *Worker { +func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, mode stages.Mode) *Worker { w := &Worker{ lock: lock, logger: logger, @@ -94,7 +95,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro dirs: dirs, - isMining: isMining, + mode: mode, } w.taskGasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) w.vmCfg = vm.Config{Debug: true, Tracer: w.callTracer} @@ -130,7 +131,7 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) { func (rw *Worker) Run() error { for txTask, ok := rw.in.Next(rw.ctx); ok; txTask, ok = rw.in.Next(rw.ctx) { - rw.RunTxTask(txTask, rw.isMining) + rw.RunTxTask(txTask, rw.mode) if err := rw.resultCh.Add(rw.ctx, txTask); err != nil { return err } @@ -138,10 +139,10 @@ func (rw *Worker) Run() error { return nil } -func (rw *Worker) RunTxTask(txTask *state.TxTask, isMining bool) { +func (rw *Worker) RunTxTask(txTask *state.TxTask, mode stages.Mode) { rw.lock.Lock() defer rw.lock.Unlock() - rw.RunTxTaskNoLock(txTask, isMining) + rw.RunTxTaskNoLock(txTask, mode) } // Needed to set history reader when need to offset few txs from block beginning and does not break processing, @@ -163,7 +164,7 @@ func (rw *Worker) SetReader(reader state.ResettableStateReader) { } } -func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) { +func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) { if txTask.HistoryExecution && !rw.historyMode { // in case if we cancelled execution and commitment happened in the middle of the block, we have to process block // from the beginning until committed txNum and only then disable history mode. @@ -231,8 +232,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) { syscall := func(contract libcommon.Address, data []byte) ([]byte, error) { return core.SysCallContract(contract, data, rw.chainConfig, ibs, header, rw.engine, false /* constCall */) } - - if isMining { + if mode == stages.BlockProduction { _, txTask.Txs, txTask.BlockReceipts, err = rw.engine.FinalizeAndAssemble(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, nil, rw.logger) } else { _, _, _, err = rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, rw.logger) @@ -297,7 +297,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) { } } -func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, isMining bool) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { +func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, mode stages.Mode) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) { reconWorkers = make([]*Worker, workerCount) resultChSize := workerCount * 8 @@ -308,7 +308,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining) + reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode) reconWorkers[i].ResetState(rs, accumulator) } if background { @@ -335,7 +335,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo //applyWorker.ResetTx(nil) } } - applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining) + applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode) return reconWorkers, applyWorker, rws, clear, wait } diff --git a/core/blockchain.go b/core/blockchain.go index 068de0b2c0e..4d996670fab 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -24,6 +24,7 @@ import ( "cmp" "encoding/json" "fmt" + "github.com/erigontech/erigon/eth/stagedsync/stages" "reflect" "slices" "time" @@ -170,7 +171,7 @@ func ExecuteBlockEphemerally( if !vmConfig.ReadOnly { txs := block.Transactions() - if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, false, logger); err != nil { + if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, stages.ApplyingBlocks, logger); err != nil { return nil, err } } @@ -323,14 +324,14 @@ func FinalizeBlockExecution( stateWriter state.StateWriter, cc *chain.Config, ibs *state.IntraBlockState, receipts types.Receipts, withdrawals []*types.Withdrawal, requests types.Requests, chainReader consensus.ChainReader, - isMining bool, + mode stages.Mode, logger log.Logger, ) (newBlock *types.Block, newTxs types.Transactions, newReceipt types.Receipts, err error) { syscall := func(contract libcommon.Address, data []byte) ([]byte, error) { return SysCallContract(contract, data, cc, ibs, header, engine, false /* constCall */) } - if isMining { + if mode == stages.BlockProduction { newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, nil, logger) } else { var rss types.Requests @@ -367,7 +368,7 @@ func InitializeBlockExecution(engine consensus.Engine, chain consensus.ChainHead return nil } -func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, isMining bool) error { +func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, mode stages.Mode) error { if gasUsed != h.GasUsed { return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x", gasUsed, h.GasUsed, h.Number.Uint64(), h.Hash()) @@ -383,7 +384,7 @@ func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receip } receiptHash := types.DeriveSha(receipts) if receiptHash != h.ReceiptHash { - if isMining { + if mode == stages.BlockProduction { h.ReceiptHash = receiptHash return nil } diff --git a/core/genesis_write.go b/core/genesis_write.go index b28acaace86..377443229bc 100644 --- a/core/genesis_write.go +++ b/core/genesis_write.go @@ -26,6 +26,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/erigontech/erigon-lib/common/dbg" "math/big" "slices" @@ -426,6 +427,7 @@ func DeveloperGenesisBlock(period uint64, faucet libcommon.Address) *types.Genes // ToBlock creates the genesis block and writes state of a genesis specification // to the given database (or discards it if nil). func GenesisToBlock(g *types.Genesis, dirs datadir.Dirs, logger log.Logger) (*types.Block, *state.IntraBlockState, error) { + println("in genesis", dbg.Stack()) if dirs.SnapDomain == "" { panic("empty `dirs` variable") } diff --git a/erigon-lib/kv/mdbx/kv_mdbx.go b/erigon-lib/kv/mdbx/kv_mdbx.go index 4bdfcd87c4d..f247e9e862f 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx.go +++ b/erigon-lib/kv/mdbx/kv_mdbx.go @@ -375,9 +375,9 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) { opts.pageSize = uint64(in.PageSize) opts.mapSize = datasize.ByteSize(in.MapSize) if opts.label == kv.ChainDB { - opts.log.Info("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize) + opts.log.Info("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize, "stack", dbg.Stack()) } else { - opts.log.Debug("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize) + opts.log.Debug("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize, "stack", dbg.Stack()) } dirtyPagesLimit, err := env.GetOption(mdbx.OptTxnDpLimit) diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index 7357753f27f..bb24194c41d 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -1417,6 +1417,7 @@ var ( ) func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileStartTxNum uint64, fileEndTxNum uint64, err error) { + //println("start gff", dt.name, len(dt.files), len(dt.visible.files)) if len(dt.files) == 0 { return } @@ -1448,6 +1449,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt if traceGetLatest == dt.name { fmt.Printf("GetLatest(%s, %x) -> existence index %s -> false\n", dt.d.filenameBase, filekey, dt.files[i].src.existence.FileName) } + //println("gFF cont", dt.name, string(v), found, i, dt.visible.files[i].src.decompressor.FileName()) continue } else { if traceGetLatest == dt.name { diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index de026efc0eb..d52954b2a5c 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -244,6 +244,7 @@ func (sd *SharedDomains) rebuildCommitment(ctx context.Context, roTx kv.Tx, bloc // SeekCommitment lookups latest available commitment and sets it as current func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromBlockBeginning uint64, err error) { bn, txn, ok, err := sd.sdCtx.SeekCommitment(tx, sd.aggTx.d[kv.CommitmentDomain], 0, math.MaxUint64) + println("seek", bn, txn, ok, fmt.Sprintf("%+v %+v", sd.aggTx.d[kv.CommitmentDomain].d.dirtyFiles.Items(), sd.aggTx.d[kv.CommitmentDomain].visible.files)) if err != nil { return 0, err } @@ -269,11 +270,13 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB if len(bnBytes) == 8 { bn = binary.BigEndian.Uint64(bnBytes) txn, err = rawdbv3.TxNums.Max(tx, bn) + println("seek here") if err != nil { return 0, err } } if bn == 0 && txn == 0 { + println("zashel?") sd.SetBlockNum(0) sd.SetTxNum(0) return 0, nil @@ -285,6 +288,7 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB return 0, err } if bytes.Equal(newRh, commitment.EmptyRootHash) { + println("here 291") sd.SetBlockNum(0) sd.SetTxNum(0) return 0, nil diff --git a/eth/backend.go b/eth/backend.go index d9d478cf0d6..285e57cd7be 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -721,7 +721,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader), stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore), ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger) + logger, stages.BlockProduction) var ethashApi *ethash.API if casted, ok := backend.engine.(*ethash.Ethash); ok { @@ -757,7 +757,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger ), stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd), stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader), - stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger) + stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.BlockProduction) // We start the mining step if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil { return nil, err @@ -910,7 +910,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.syncPruneOrder = stagedsync.DefaultPruneOrder } - backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger) + backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger, stages.ApplyingBlocks) hook := stages2.NewHook(backend.sentryCtx, backend.chainDB, backend.notifications, backend.stagedSync, backend.blockReader, backend.chainConfig, backend.logger, backend.sentriesClient.SetStatus) @@ -938,7 +938,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger checkStateRoot := true pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot) - backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) + backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks) backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx) executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 57a56b38c3f..62648c56da0 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -191,7 +191,6 @@ func ExecV3(ctx context.Context, maxBlockNum uint64, logger log.Logger, initialCycle bool, - isMining bool, ) error { // TODO: e35 doesn't support parallel-exec yet parallel = false //nolint @@ -234,10 +233,10 @@ func ExecV3(ctx context.Context, pruneNonEssentials := cfg.prune.History.Enabled() && cfg.prune.History.PruneTo(execStage.BlockNumber) == execStage.BlockNumber + println("stageExec21 ", agg.OpenFolder()) var err error - inMemExec := txc.Doms != nil var doms *state2.SharedDomains - if inMemExec { + if execStage.CurrentSyncCycle.Mode == stages.ForkValidation { doms = txc.Doms } else { var err error @@ -253,6 +252,7 @@ func ExecV3(ctx context.Context, defer doms.Close() } txNumInDB := doms.TxNum() + println("txnumind", doms.TxNum()) txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, cfg.blockReader)) @@ -284,6 +284,7 @@ func ExecV3(ctx context.Context, return err } ok, _blockNum, err := txNumsReader.FindBlockNum(applyTx, doms.TxNum()) + println("blocknum", _blockNum, "domsTxNum", doms.TxNum()) if err != nil { return err } @@ -350,6 +351,7 @@ func ExecV3(ctx context.Context, ts := time.Duration(0) blockNum = doms.BlockNum() outputTxNum.Store(doms.TxNum()) + println("stageExec22 ", agg.OpenFolder()) if maxBlockNum < blockNum { return nil @@ -372,7 +374,7 @@ func ExecV3(ctx context.Context, var count uint64 var lock sync.RWMutex - shouldReportToTxPool := cfg.notifications != nil && !isMining && maxBlockNum <= blockNum+64 + shouldReportToTxPool := cfg.notifications != nil && execStage.CurrentSyncCycle.Mode != stages.BlockProduction && maxBlockNum <= blockNum+64 var accumulator *shards.Accumulator if shouldReportToTxPool { accumulator = cfg.notifications.Accumulator @@ -393,10 +395,10 @@ func ExecV3(ctx context.Context, rwsConsumed := make(chan struct{}, 1) defer close(rwsConsumed) - execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, isMining) + execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs, execStage.CurrentSyncCycle.Mode) defer stopWorkers() applyWorker := cfg.applyWorker - if isMining { + if execStage.CurrentSyncCycle.Mode == stages.BlockProduction { applyWorker = cfg.applyWorkerMining } applyWorker.ResetState(rs, accumulator) @@ -437,7 +439,7 @@ func ExecV3(ctx context.Context, return err } - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, isMining) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, rwsConsumed, applyWorker, true, false, execStage.CurrentSyncCycle.Mode) if err != nil { return err } @@ -515,14 +517,14 @@ func ExecV3(ctx context.Context, return err } ac.Close() - if !inMemExec { + if execStage.CurrentSyncCycle.Mode != stages.ForkValidation { if err = doms.Flush(ctx, tx); err != nil { return err } } break } - if inMemExec { + if execStage.CurrentSyncCycle.Mode == stages.ForkValidation { break } @@ -538,7 +540,7 @@ func ExecV3(ctx context.Context, rws.DrainNonBlocking(ctx) applyWorker.ResetTx(tx) - processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, isMining) + processedTxNum, conflicts, triggers, processedBlockNum, stoppedAtBlockEnd, err := processResultQueue(ctx, in, rws, outputTxNum.Load(), rs, agg, tx, nil, applyWorker, false, true, execStage.CurrentSyncCycle.Mode) if err != nil { return err } @@ -689,6 +691,8 @@ func ExecV3(ctx context.Context, var b *types.Block + println("stageExec3", agg.OpenFolder()) + // Only needed by bor chains shouldGenerateChangesetsForLastBlocks := cfg.chainConfig.Bor != nil @@ -819,6 +823,7 @@ Loop: Config: chainConfig, } if txTask.HistoryExecution && usedGas == 0 { + println("assigned 810") usedGas, blobGasUsed, _, err = rawtemporaldb.ReceiptAsOf(applyTx.(kv.TemporalTx), txTask.TxNum) if err != nil { return err @@ -873,7 +878,7 @@ Loop: if txTask.Error != nil { break Loop } - applyWorker.RunTxTaskNoLock(txTask, isMining) + applyWorker.RunTxTaskNoLock(txTask, execStage.CurrentSyncCycle.Mode) if err := func() error { if errors.Is(txTask.Error, context.Canceled) { return err @@ -895,12 +900,12 @@ Loop: txTask.CreateReceipt(applyTx) if txTask.Final { - if !isMining && !inMemExec && !skipPostEvaluation && !execStage.CurrentSyncCycle.IsInitialCycle { + if execStage.CurrentSyncCycle.Mode == stages.ApplyingBlocks && !skipPostEvaluation && !execStage.CurrentSyncCycle.IsInitialCycle { cfg.notifications.RecentLogs.Add(blockReceipts) } - checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && !isMining + checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && execStage.CurrentSyncCycle.Mode != stages.BlockProduction if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec - if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, txTask.BlockReceipts, txTask.Header, isMining); err != nil { + if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, txTask.BlockReceipts, txTask.Header, execStage.CurrentSyncCycle.Mode); err != nil { return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go } } @@ -965,7 +970,7 @@ Loop: ts += time.Since(start) aggTx.RestrictSubsetFileDeletions(false) doms.SavePastChangesetAccumulator(b.Hash(), blockNum, changeset) - if !inMemExec { + if execStage.CurrentSyncCycle.Mode != stages.ForkValidation { if err := state2.WriteDiffSet(applyTx, blockNum, b.Hash(), changeset); err != nil { return err } @@ -980,7 +985,7 @@ Loop: // MA commitTx if !parallel { - if !inMemExec && !isMining { + if execStage.CurrentSyncCycle.Mode == stages.ApplyingBlocks { metrics2.UpdateBlockConsumerPostExecutionDelay(b.Time(), blockNum, logger) } @@ -988,7 +993,7 @@ Loop: select { case <-logEvery.C: - if inMemExec || isMining { + if execStage.CurrentSyncCycle.Mode == stages.ForkValidation || execStage.CurrentSyncCycle.Mode == stages.BlockProduction { break } @@ -1019,7 +1024,7 @@ Loop: t1, t2, t3 time.Duration ) - if ok, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, inMemExec); err != nil { + if ok, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.CurrentSyncCycle.Mode); err != nil { return err } else if !ok { break Loop @@ -1105,7 +1110,7 @@ Loop: if u != nil && !u.HasUnwindPoint() { if b != nil { - _, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, inMemExec) + _, err := flushAndCheckCommitmentV3(ctx, b.HeaderNoCopy(), applyTx, doms, cfg, execStage, stageProgress, parallel, logger, u, execStage.CurrentSyncCycle.Mode) if err != nil { return err } @@ -1179,7 +1184,7 @@ func dumpPlainStateDebug(tx kv.RwTx, doms *state2.SharedDomains) { } // flushAndCheckCommitmentV3 - does write state to db and then check commitment -func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyTx kv.RwTx, doms *state2.SharedDomains, cfg ExecuteBlockCfg, e *StageState, maxBlockNum uint64, parallel bool, logger log.Logger, u Unwinder, inMemExec bool) (bool, error) { +func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyTx kv.RwTx, doms *state2.SharedDomains, cfg ExecuteBlockCfg, e *StageState, maxBlockNum uint64, parallel bool, logger log.Logger, u Unwinder, mode stages.Mode) (bool, error) { // E2 state root check was in another stage - means we did flush state even if state root will not match // And Unwind expecting it @@ -1212,7 +1217,7 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT return true, nil } if bytes.Equal(rh, header.Root.Bytes()) { - if !inMemExec { + if mode != stages.ForkValidation { if err := doms.Flush(ctx, applyTx); err != nil { return false, err } @@ -1283,7 +1288,7 @@ func blockWithSenders(ctx context.Context, db kv.RoDB, tx kv.Tx, blockReader ser return b, err } -func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *state.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool, isMining bool) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { +func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *state.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *state2.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool, mode stages.Mode) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { rwsIt := rws.Iter() defer rwsIt.Close() @@ -1301,7 +1306,7 @@ func processResultQueue(ctx context.Context, in *state.QueueWithRetry, rws *stat } // resolve first conflict right here: it's faster and conflict-free - applyWorker.RunTxTask(txTask, isMining) + applyWorker.RunTxTask(txTask, mode) if txTask.Error != nil { return outputTxNum, conflicts, triggers, processedBlockNum, false, fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, txTask.Error) } diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index ce651474132..7f88ec867ab 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -59,6 +59,7 @@ type Stage struct { type CurrentSyncCycleInfo struct { IsInitialCycle bool // means: not-on-chain-tip. can be several sync cycle in this mode. IsFirstCycle bool // means: first cycle + Mode stages.Mode } // StageState is the state of the stage. diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 9a6bdda9dd9..1d0b1920826 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -131,15 +131,15 @@ func StageExecuteBlocksCfg( historyV3: true, syncCfg: syncCfg, silkworm: silkworm, - applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, false), - applyWorkerMining: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, true), keepAllChangesets: keepAllChangesets, + applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, stages.ApplyingBlocks), + applyWorkerMining: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs, stages.BlockProduction), } } // ================ Erigon3 ================ -func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger, isMining bool) (err error) { +func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger) (err error) { workersCount := cfg.syncCfg.ExecWorkerCount if !initialCycle { workersCount = 1 @@ -159,7 +159,7 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 } parallel := txc.Tx == nil - if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle, isMining); err != nil { + if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle); err != nil { return err } return nil @@ -252,7 +252,7 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, to if dbg.StagesOnlyBlocks { return nil } - if err = ExecBlockV3(s, u, txc, toBlock, ctx, cfg, s.CurrentSyncCycle.IsInitialCycle, logger, false); err != nil { + if err = ExecBlockV3(s, u, txc, toBlock, ctx, cfg, s.CurrentSyncCycle.IsInitialCycle, logger); err != nil { return err } return nil diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index a98387dac6d..160b25a36bd 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -200,7 +200,7 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg var err error var block *types.Block - block, current.Txs, current.Receipts, err = core.FinalizeBlockExecution(cfg.engine, stateReader, current.Header, current.Txs, current.Uncles, &state.NoopWriter{}, &cfg.chainConfig, ibs, current.Receipts, current.Withdrawals, current.Requests, chainReader, true, logger) + block, current.Txs, current.Receipts, err = core.FinalizeBlockExecution(cfg.engine, stateReader, current.Header, current.Txs, current.Uncles, &state.NoopWriter{}, &cfg.chainConfig, ibs, current.Receipts, current.Withdrawals, current.Requests, chainReader, s.state.mode, logger) if err != nil { return fmt.Errorf("cannot finalize block execution: %s", err) } @@ -238,7 +238,7 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg // This flag will skip checking the state root execCfg.blockProduction = true execS := &StageState{state: s.state, ID: stages.Execution, BlockNumber: blockHeight - 1} - if err = ExecBlockV3(execS, u, txc, blockHeight, context.Background(), execCfg, false, logger, true); err != nil { + if err = ExecBlockV3(execS, u, txc, blockHeight, context.Background(), execCfg, false, logger); err != nil { logger.Error("cannot execute block execution", "err", err) return err } diff --git a/eth/stagedsync/stagedsynctest/harness.go b/eth/stagedsync/stagedsynctest/harness.go index 71803cf0b32..6bcaec8a82d 100644 --- a/eth/stagedsync/stagedsynctest/harness.go +++ b/eth/stagedsync/stagedsynctest/harness.go @@ -86,6 +86,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, + stages.ApplyingBlocks, ) miningSyncStages := stagedsync.MiningStages( ctx, @@ -102,6 +103,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, + stages.BlockProduction, ) validatorKey, err := crypto.GenerateKey() require.NoError(t, err) diff --git a/eth/stagedsync/stages/stages.go b/eth/stagedsync/stages/stages.go index a6d70c5b8b2..4183fb19c04 100644 --- a/eth/stagedsync/stages/stages.go +++ b/eth/stagedsync/stages/stages.go @@ -115,3 +115,11 @@ func encodeBigEndian(n uint64) []byte { binary.BigEndian.PutUint64(v[:], n) return v[:] } + +type Mode int + +const ( + BlockProduction = Mode(iota) + ForkValidation + ApplyingBlocks +) diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index 5f3b20ce3ed..aa9985145f6 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -49,6 +49,7 @@ type Sync struct { logPrefixes []string logger log.Logger stagesIdsList []string + mode stages.Mode } type Timing struct { @@ -77,7 +78,7 @@ func (s *Sync) PrevUnwindPoint() *uint64 { } func (s *Sync) NewUnwindState(id stages.SyncStage, unwindPoint, currentProgress uint64, initialCycle, firstCycle bool) *UnwindState { - return &UnwindState{id, unwindPoint, currentProgress, UnwindReason{nil, nil}, s, CurrentSyncCycleInfo{initialCycle, firstCycle}} + return &UnwindState{id, unwindPoint, currentProgress, UnwindReason{nil, nil}, s, CurrentSyncCycleInfo{initialCycle, firstCycle, s.mode}} } // Get the current prune status from the DB @@ -102,7 +103,7 @@ func (s *Sync) PruneStageState(id stages.SyncStage, forwardProgress uint64, tx k } } - return &PruneState{id, forwardProgress, pruneProgress, s, CurrentSyncCycleInfo{initialCycle, false}}, nil + return &PruneState{id, forwardProgress, pruneProgress, s, CurrentSyncCycleInfo{initialCycle, false, s.mode}}, nil } func (s *Sync) NextStage() { @@ -207,7 +208,7 @@ func (s *Sync) SetCurrentStage(id stages.SyncStage) error { return fmt.Errorf("stage not found with id: %v", id) } -func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger) *Sync { +func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, pruneOrder PruneOrder, logger log.Logger, mode stages.Mode) *Sync { unwindStages := make([]*Stage, len(stagesList)) for i, stageIndex := range unwindOrder { for _, s := range stagesList { @@ -243,6 +244,7 @@ func New(cfg ethconfig.Sync, stagesList []*Stage, unwindOrder UnwindOrder, prune logPrefixes: logPrefixes, logger: logger, stagesIdsList: stagesIdsList, + mode: mode, } } @@ -267,7 +269,7 @@ func (s *Sync) StageState(stage stages.SyncStage, tx kv.Tx, db kv.RoDB, initialC } } - return &StageState{s, stage, blockNum, CurrentSyncCycleInfo{initialCycle, firstCycle}}, nil + return &StageState{s, stage, blockNum, CurrentSyncCycleInfo{initialCycle, firstCycle, s.mode}}, nil } func (s *Sync) RunUnwind(db kv.RwDB, txc wrap.TxContainer) error { diff --git a/eth/stagedsync/sync_test.go b/eth/stagedsync/sync_test.go index ed9c8db253a..f86c4c3f223 100644 --- a/eth/stagedsync/sync_test.go +++ b/eth/stagedsync/sync_test.go @@ -59,7 +59,7 @@ func TestStagesSuccess(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -99,7 +99,7 @@ func TestDisabledStages(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -139,7 +139,7 @@ func TestErroredStage(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err) @@ -207,7 +207,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -285,7 +285,7 @@ func TestUnwind(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -374,7 +374,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -430,12 +430,12 @@ func TestSyncDoTwice(t *testing.T) { }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) - state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -488,14 +488,14 @@ func TestStateSyncInterruptRestart(t *testing.T) { }, } - state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Equal(t, fmt.Errorf("[2/3 Bodies] %w", expectedErr), err) expectedErr = nil - state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New()) + state = New(ethconfig.Defaults.Sync, s, nil, nil, log.New(), stages.ApplyingBlocks) _, err = state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.NoError(t, err) @@ -567,7 +567,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) { }, }, } - state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New()) + state := New(ethconfig.Defaults.Sync, s, []stages.SyncStage{s[2].ID, s[1].ID, s[0].ID}, nil, log.New(), stages.ApplyingBlocks) db, tx := memdb.NewTestTx(t) _, err := state.Run(db, wrap.TxContainer{Tx: tx}, true /* initialCycle */, false) assert.Error(t, errInterrupted, err) diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index d24203e57f9..af63f1aee27 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -187,6 +187,9 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original if err != nil { return err } + if num == nil { + return errors.New("block number is nil") + } return rawdb.WriteLastNewBlockSeen(tx, *num) }); err != nil { sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, false) diff --git a/turbo/execution/eth1/inserters.go b/turbo/execution/eth1/inserters.go index c2900a9e475..680bd587418 100644 --- a/turbo/execution/eth1/inserters.go +++ b/turbo/execution/eth1/inserters.go @@ -86,7 +86,7 @@ func (e *EthereumExecutionModule) InsertBlocks(ctx context.Context, req *executi // Parent's total difficulty parentTd, err = rawdb.ReadTd(tx, header.ParentHash, height-1) if err != nil || parentTd == nil { - return nil, fmt.Errorf("parent's total difficulty not found with hash %x and height %d: %v", header.ParentHash, height-1, err) + return nil, fmt.Errorf("ethereumExecutionModule.InsertBlocks: parent's total difficulty not found with hash %x and height %d: %v", header.ParentHash, height-1, err) } } else { parentTd = big.NewInt(0) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index d31ee10e72c..00f8857ae18 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -504,7 +504,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, dirs.Tmp, nil, 0, mock.TxPool, nil, mock.BlockReader), stagedsync.StageMiningFinishCfg(mock.DB, *mock.ChainConfig, mock.Engine, miner, miningCancel, mock.BlockReader, latestBlockBuiltStore), ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, - logger) + logger, stages.BlockProduction) // We start the mining step if err := stages2.MiningStep(ctx, mock.DB, proposingSync, tmpdir, logger); err != nil { return nil, err @@ -541,12 +541,13 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, + stages.ApplyingBlocks, ) cfg.Genesis = gspec pipelineStages := stages2.NewPipelineStages(mock.Ctx, db, &cfg, p2p.Config{}, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, mock.agg, nil, forkValidator, logger, checkStateRoot) - mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) + mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks) mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx) @@ -582,6 +583,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, + stages.BlockProduction, ) cfg.Genesis = gspec diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 52bc1f1c571..d965fa3af55 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -714,6 +714,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config stagedsync.StateUnwindOrder, nil, /* pruneOrder */ logger, + stages.ForkValidation, ) }