Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Sync enum #12104

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
defer sn.Close()
defer borSn.Close()
defer agg.Close()
println("stageExec", agg.OpenFolder())
if warmup {
return reset2.WarmupExec(ctx, db)
}
Expand Down Expand Up @@ -1091,6 +1092,8 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return nil
}

println("stageExec1", agg.OpenFolder())

err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, block, ctx, cfg, logger)
if err != nil {
return err
Expand Down Expand Up @@ -1430,9 +1433,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
recents = bor.Recents
signatures = bor.Signatures
}
stages := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
stagesDefault := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
heimdallClient, recents, signatures, logger)
sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)
sync := stagedsync.New(cfg.Sync, stagesDefault, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ApplyingBlocks)

miner := stagedsync.NewMiningState(&cfg.Miner)
miningCancel := make(chan struct{})
Expand Down Expand Up @@ -1471,6 +1474,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
stagedsync.MiningUnwindOrder,
stagedsync.MiningPruneOrder,
logger,
stages.BlockProduction,
)

return engine, vmConfig, sync, miningSync, miner
Expand Down
24 changes: 12 additions & 12 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package exec3

import (
"context"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"sync"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -70,10 +71,10 @@ type Worker struct {

dirs datadir.Dirs

isMining bool
mode stages.Mode
}

func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, isMining bool) *Worker {
func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs, mode stages.Mode) *Worker {
w := &Worker{
lock: lock,
logger: logger,
Expand All @@ -94,7 +95,7 @@ func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, backgro

dirs: dirs,

isMining: isMining,
mode: mode,
}
w.taskGasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock())
w.vmCfg = vm.Config{Debug: true, Tracer: w.callTracer}
Expand Down Expand Up @@ -130,18 +131,18 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) {

func (rw *Worker) Run() error {
for txTask, ok := rw.in.Next(rw.ctx); ok; txTask, ok = rw.in.Next(rw.ctx) {
rw.RunTxTask(txTask, rw.isMining)
rw.RunTxTask(txTask, rw.mode)
if err := rw.resultCh.Add(rw.ctx, txTask); err != nil {
return err
}
}
return nil
}

func (rw *Worker) RunTxTask(txTask *state.TxTask, isMining bool) {
func (rw *Worker) RunTxTask(txTask *state.TxTask, mode stages.Mode) {
rw.lock.Lock()
defer rw.lock.Unlock()
rw.RunTxTaskNoLock(txTask, isMining)
rw.RunTxTaskNoLock(txTask, mode)
}

// Needed to set history reader when need to offset few txs from block beginning and does not break processing,
Expand All @@ -163,7 +164,7 @@ func (rw *Worker) SetReader(reader state.ResettableStateReader) {
}
}

func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) {
func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, mode stages.Mode) {
if txTask.HistoryExecution && !rw.historyMode {
// in case if we cancelled execution and commitment happened in the middle of the block, we have to process block
// from the beginning until committed txNum and only then disable history mode.
Expand Down Expand Up @@ -231,8 +232,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) {
syscall := func(contract libcommon.Address, data []byte) ([]byte, error) {
return core.SysCallContract(contract, data, rw.chainConfig, ibs, header, rw.engine, false /* constCall */)
}

if isMining {
if mode == stages.BlockProduction {
_, txTask.Txs, txTask.BlockReceipts, err = rw.engine.FinalizeAndAssemble(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, nil, rw.logger)
} else {
_, _, _, err = rw.engine.Finalize(rw.chainConfig, types.CopyHeader(header), ibs, txTask.Txs, txTask.Uncles, txTask.BlockReceipts, txTask.Withdrawals, txTask.Requests, rw.chain, syscall, rw.logger)
Expand Down Expand Up @@ -297,7 +297,7 @@ func (rw *Worker) RunTxTaskNoLock(txTask *state.TxTask, isMining bool) {
}
}

func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, isMining bool) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int, dirs datadir.Dirs, mode stages.Mode) (reconWorkers []*Worker, applyWorker *Worker, rws *state.ResultsQueue, clear func(), wait func()) {
reconWorkers = make([]*Worker, workerCount)

resultChSize := workerCount * 8
Expand All @@ -308,7 +308,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining)
reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode)
reconWorkers[i].ResetState(rs, accumulator)
}
if background {
Expand All @@ -335,7 +335,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo
//applyWorker.ResetTx(nil)
}
}
applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, isMining)
applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs, mode)

return reconWorkers, applyWorker, rws, clear, wait
}
11 changes: 6 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"cmp"
"encoding/json"
"fmt"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"reflect"
"slices"
"time"
Expand Down Expand Up @@ -170,7 +171,7 @@ func ExecuteBlockEphemerally(

if !vmConfig.ReadOnly {
txs := block.Transactions()
if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, false, logger); err != nil {
if _, _, _, err := FinalizeBlockExecution(engine, stateReader, block.Header(), txs, block.Uncles(), stateWriter, chainConfig, ibs, receipts, block.Withdrawals(), block.Requests(), chainReader, stages.ApplyingBlocks, logger); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -323,14 +324,14 @@ func FinalizeBlockExecution(
stateWriter state.StateWriter, cc *chain.Config,
ibs *state.IntraBlockState, receipts types.Receipts,
withdrawals []*types.Withdrawal, requests types.Requests, chainReader consensus.ChainReader,
isMining bool,
mode stages.Mode,
logger log.Logger,
) (newBlock *types.Block, newTxs types.Transactions, newReceipt types.Receipts, err error) {
syscall := func(contract libcommon.Address, data []byte) ([]byte, error) {
return SysCallContract(contract, data, cc, ibs, header, engine, false /* constCall */)
}

if isMining {
if mode == stages.BlockProduction {
newBlock, newTxs, newReceipt, err = engine.FinalizeAndAssemble(cc, header, ibs, txs, uncles, receipts, withdrawals, requests, chainReader, syscall, nil, logger)
} else {
var rss types.Requests
Expand Down Expand Up @@ -367,7 +368,7 @@ func InitializeBlockExecution(engine consensus.Engine, chain consensus.ChainHead
return nil
}

func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, isMining bool) error {
func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receipts types.Receipts, h *types.Header, mode stages.Mode) error {
if gasUsed != h.GasUsed {
return fmt.Errorf("gas used by execution: %d, in header: %d, headerNum=%d, %x",
gasUsed, h.GasUsed, h.Number.Uint64(), h.Hash())
Expand All @@ -383,7 +384,7 @@ func BlockPostValidation(gasUsed, blobGasUsed uint64, checkReceipts bool, receip
}
receiptHash := types.DeriveSha(receipts)
if receiptHash != h.ReceiptHash {
if isMining {
if mode == stages.BlockProduction {
h.ReceiptHash = receiptHash
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions core/genesis_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/erigontech/erigon-lib/common/dbg"
"math/big"
"slices"

Expand Down Expand Up @@ -426,6 +427,7 @@ func DeveloperGenesisBlock(period uint64, faucet libcommon.Address) *types.Genes
// ToBlock creates the genesis block and writes state of a genesis specification
// to the given database (or discards it if nil).
func GenesisToBlock(g *types.Genesis, dirs datadir.Dirs, logger log.Logger) (*types.Block, *state.IntraBlockState, error) {
println("in genesis", dbg.Stack())
if dirs.SnapDomain == "" {
panic("empty `dirs` variable")
}
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) {
opts.pageSize = uint64(in.PageSize)
opts.mapSize = datasize.ByteSize(in.MapSize)
if opts.label == kv.ChainDB {
opts.log.Info("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize)
opts.log.Info("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize, "stack", dbg.Stack())
} else {
opts.log.Debug("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize)
opts.log.Debug("[db] open", "label", opts.label, "sizeLimit", opts.mapSize, "pageSize", opts.pageSize, "stack", dbg.Stack())
}

dirtyPagesLimit, err := env.GetOption(mdbx.OptTxnDpLimit)
Expand Down
2 changes: 2 additions & 0 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,7 @@ var (
)

func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileStartTxNum uint64, fileEndTxNum uint64, err error) {
//println("start gff", dt.name, len(dt.files), len(dt.visible.files))
if len(dt.files) == 0 {
return
}
Expand Down Expand Up @@ -1448,6 +1449,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt
if traceGetLatest == dt.name {
fmt.Printf("GetLatest(%s, %x) -> existence index %s -> false\n", dt.d.filenameBase, filekey, dt.files[i].src.existence.FileName)
}
//println("gFF cont", dt.name, string(v), found, i, dt.visible.files[i].src.decompressor.FileName())
continue
} else {
if traceGetLatest == dt.name {
Expand Down
4 changes: 4 additions & 0 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func (sd *SharedDomains) rebuildCommitment(ctx context.Context, roTx kv.Tx, bloc
// SeekCommitment lookups latest available commitment and sets it as current
func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromBlockBeginning uint64, err error) {
bn, txn, ok, err := sd.sdCtx.SeekCommitment(tx, sd.aggTx.d[kv.CommitmentDomain], 0, math.MaxUint64)
println("seek", bn, txn, ok, fmt.Sprintf("%+v %+v", sd.aggTx.d[kv.CommitmentDomain].d.dirtyFiles.Items(), sd.aggTx.d[kv.CommitmentDomain].visible.files))
if err != nil {
return 0, err
}
Expand All @@ -269,11 +270,13 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB
if len(bnBytes) == 8 {
bn = binary.BigEndian.Uint64(bnBytes)
txn, err = rawdbv3.TxNums.Max(tx, bn)
println("seek here")
if err != nil {
return 0, err
}
}
if bn == 0 && txn == 0 {
println("zashel?")
sd.SetBlockNum(0)
sd.SetTxNum(0)
return 0, nil
Expand All @@ -285,6 +288,7 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB
return 0, err
}
if bytes.Equal(newRh, commitment.EmptyRootHash) {
println("here 291")
sd.SetBlockNum(0)
sd.SetTxNum(0)
return 0, nil
Expand Down
8 changes: 4 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore),
), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder,
logger)
logger, stages.BlockProduction)

var ethashApi *ethash.API
if casted, ok := backend.engine.(*ethash.Ethash); ok {
Expand Down Expand Up @@ -757,7 +757,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
),
stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger)
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.BlockProduction)
// We start the mining step
if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil {
return nil, err
Expand Down Expand Up @@ -910,7 +910,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.syncPruneOrder = stagedsync.DefaultPruneOrder
}

backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger)
backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger, stages.ApplyingBlocks)

hook := stages2.NewHook(backend.sentryCtx, backend.chainDB, backend.notifications, backend.stagedSync, backend.blockReader, backend.chainConfig, backend.logger, backend.sentriesClient.SetStatus)

Expand Down Expand Up @@ -938,7 +938,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

checkStateRoot := true
pipelineStages := stages2.NewPipelineStages(ctx, backend.chainDB, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ApplyingBlocks)
backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx)
executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer)

Expand Down
Loading
Loading