Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Metrics #12803

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions erigon-lib/commitment/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math/bits"
"sort"
"strings"
"time"

"github.com/holiman/uint256"

Expand Down Expand Up @@ -113,6 +114,10 @@ type PatriciaContext interface {
Account(plainKey []byte) (*Update, error)
// fetch storage with given plain key
Storage(plainKey []byte) (*Update, error)
// Reset performance counters, but not cumulative counters
ResetPerfCounters()
// Obtain performance counters
PerfCounters() map[string]time.Duration
}

type TrieVariant string
Expand Down
48 changes: 45 additions & 3 deletions erigon-lib/commitment/hex_patricia_hashed.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ import (

"golang.org/x/crypto/sha3"

"github.com/erigontech/erigon-lib/common/hexutility"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/common/length"
rlp "github.com/erigontech/erigon-lib/rlp2"
)
Expand Down Expand Up @@ -1504,6 +1503,13 @@ func (hph *HexPatriciaHashed) RootHash() ([]byte, error) {
return rootHash[1:], nil // first byte is 128+hash_len=160
}

var (
timeTotalCum time.Duration
timeSpentFold time.Duration
timeSpentUnf time.Duration
perfCountersCum map[string]time.Duration
)

func (hph *HexPatriciaHashed) Process(ctx context.Context, updates *Updates, logPrefix string) (rootHash []byte, err error) {
var (
m runtime.MemStats
Expand Down Expand Up @@ -1533,15 +1539,21 @@ func (hph *HexPatriciaHashed) Process(ctx context.Context, updates *Updates, log
}
// Keep folding until the currentKey is the prefix of the key we modify
for hph.needFolding(hashedKey) {
start := time.Now()
if err := hph.fold(); err != nil {
return fmt.Errorf("fold: %w", err)
}
t := time.Since(start)
timeSpentFold += t
}
// Now unfold until we step on an empty cell
for unfolding := hph.needUnfolding(hashedKey); unfolding > 0; unfolding = hph.needUnfolding(hashedKey) {
start := time.Now()
if err := hph.unfold(hashedKey, unfolding); err != nil {
return fmt.Errorf("unfold: %w", err)
}
t := time.Since(start)
timeSpentUnf += t
}

if stateUpdate == nil {
Expand Down Expand Up @@ -1581,6 +1593,37 @@ func (hph *HexPatriciaHashed) Process(ctx context.Context, updates *Updates, log
return nil, fmt.Errorf("final fold: %w", err)
}
}
total := time.Since(start)
timeTotalCum += total
perfCounters := hph.ctx.PerfCounters()
if perfCountersCum == nil {
perfCountersCum = map[string]time.Duration{}
}
for k, v := range perfCounters {
perfCountersCum[k+"_cum"] += v
}
perfCountersCum["fold_cum"] += timeSpentFold
perfCountersCum["unfold_cum"] += timeSpentUnf
fmt.Println(
"total", total, "total_cum", timeTotalCum,
"\nfold", timeSpentFold,
"unfold", timeSpentUnf,
"\n", perfCounters,
"\n", perfCountersCum)
timeSpentFold, timeSpentUnf = 0, 0
hph.ctx.ResetPerfCounters()
cum_keys := make([]string, 0, len(perfCountersCum))
for k := range perfCountersCum {
cum_keys = append(cum_keys, k)
}
sort.Strings(cum_keys)
perfCountersPercents := map[string]float64{}
for k, v := range perfCountersCum {
perfCountersPercents[k] = 100.0 * float64(v) / float64(timeTotalCum)
}
for _, k := range cum_keys {
fmt.Printf("%6.2f%% %s\n", perfCountersPercents[k], k)
}

rootHash, err = hph.RootHash()
if err != nil {
Expand Down Expand Up @@ -1948,7 +1991,6 @@ func (hph *HexPatriciaHashed) SetState(buf []byte) error {
if hph.ctx == nil {
panic("nil ctx")
}

update, err := hph.ctx.Account(hph.root.accountAddr[:hph.root.accountAddrLen])
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions erigon-lib/commitment/patricia_state_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"slices"
"testing"
"time"

"github.com/holiman/uint256"
"golang.org/x/crypto/sha3"
Expand Down Expand Up @@ -129,6 +130,11 @@ func (ms *MockState) Storage(plainKey []byte) (*Update, error) {
return &ex, nil
}

func (ms *MockState) ResetPerfCounters() {}
func (ms *MockState) PerfCounters() map[string]time.Duration {
return map[string]time.Duration{}
}

func (ms *MockState) applyPlainUpdates(plainKeys [][]byte, updates []Update) error {
for i, key := range plainKeys {
update := updates[i]
Expand Down
50 changes: 50 additions & 0 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,24 @@ func (sd *SharedDomains) SizeEstimate() uint64 {
}

func (sd *SharedDomains) LatestCommitment(prefix []byte) ([]byte, uint64, error) {
a := time.Now()
var b time.Time
var db_time, files_time time.Duration
defer func() {
sd.sdCtx.commReadTime += time.Since(a)
sd.sdCtx.commDbTime += db_time
sd.sdCtx.commFilesTime += files_time
}()
if v, prevStep, ok := sd.get(kv.CommitmentDomain, prefix); ok {
// sd cache values as is (without transformation) so safe to return
return v, prevStep, nil
}
v, step, found, err := sd.aggTx.d[kv.CommitmentDomain].getLatestFromDb(prefix, sd.roTx)
db_time = time.Since(a)
if err != nil {
return nil, 0, fmt.Errorf("commitment prefix %x read error: %w", prefix, err)
}
b = time.Now()
if found {
// db store values as is (without transformation) so safe to return
return v, step, nil
Expand All @@ -448,6 +458,7 @@ func (sd *SharedDomains) LatestCommitment(prefix []byte) ([]byte, uint64, error)
// getFromFiles doesn't provide same semantics as getLatestFromDB - it returns start/end tx
// of file where the value is stored (not exact step when kv has been set)
v, _, startTx, endTx, err := sd.aggTx.d[kv.CommitmentDomain].getFromFiles(prefix, 0)
files_time = time.Since(b)
if err != nil {
return nil, 0, fmt.Errorf("commitment prefix %x read error: %w", prefix, err)
}
Expand All @@ -461,11 +472,16 @@ func (sd *SharedDomains) LatestCommitment(prefix []byte) ([]byte, uint64, error)
if err != nil {
return nil, 0, err
}

return rv, endTx / sd.aggTx.a.StepSize(), nil
}

// replaceShortenedKeysInBranch replaces shortened keys in the branch with full keys
func (sd *SharedDomains) replaceShortenedKeysInBranch(prefix []byte, branch commitment.BranchData, fStartTxNum uint64, fEndTxNum uint64) (commitment.BranchData, error) {
a := time.Now()
defer func() {
sd.sdCtx.replaceTime += time.Since(a)
}()
if !sd.aggTx.d[kv.CommitmentDomain].d.replaceKeysInValues && sd.aggTx.a.commitmentValuesTransform {
panic("domain.replaceKeysInValues is disabled, but agg.commitmentValuesTransform is enabled")
}
Expand Down Expand Up @@ -1118,6 +1134,12 @@ type SharedDomainsCommitmentContext struct {
justRestored atomic.Bool

limitReadAsOfTxNum uint64
accountReadTime time.Duration
storageReadTime time.Duration
commReadTime time.Duration
replaceTime time.Duration
commDbTime time.Duration
commFilesTime time.Duration
}

func (sdc *SharedDomainsCommitmentContext) SetLimitReadAsOfTxNum(txNum uint64) {
Expand Down Expand Up @@ -1188,6 +1210,10 @@ func (sdc *SharedDomainsCommitmentContext) PutBranch(prefix []byte, data []byte,
}

func (sdc *SharedDomainsCommitmentContext) Account(plainKey []byte) (u *commitment.Update, err error) {
a := time.Now()
defer func() {
sdc.accountReadTime += time.Since(a)
}()
var encAccount []byte
if sdc.limitReadAsOfTxNum == 0 {
encAccount, _, err = sdc.sharedDomains.GetLatest(kv.AccountsDomain, plainKey, nil)
Expand Down Expand Up @@ -1249,6 +1275,10 @@ func (sdc *SharedDomainsCommitmentContext) Account(plainKey []byte) (u *commitme
}

func (sdc *SharedDomainsCommitmentContext) Storage(plainKey []byte) (u *commitment.Update, err error) {
a := time.Now()
defer func() {
sdc.storageReadTime += time.Since(a)
}()
// Look in the summary table first
var enc []byte
if sdc.limitReadAsOfTxNum == 0 {
Expand Down Expand Up @@ -1345,6 +1375,26 @@ func (sdc *SharedDomainsCommitmentContext) ComputeCommitment(ctx context.Context
return rootHash, err
}

func (sdc *SharedDomainsCommitmentContext) ResetPerfCounters() {
sdc.accountReadTime = 0
sdc.storageReadTime = 0
sdc.commReadTime = 0
sdc.replaceTime = 0
sdc.commDbTime = 0
sdc.commFilesTime = 0
}

func (sdc *SharedDomainsCommitmentContext) PerfCounters() map[string]time.Duration {
m := map[string]time.Duration{}
m["read_account"] = sdc.accountReadTime
m["read_storage"] = sdc.storageReadTime
m["read_comm"] = sdc.commReadTime
m["read_comm_db"] = sdc.commDbTime
m["read_comm_files"] = sdc.commFilesTime
m["read_comm_replace"] = sdc.replaceTime
return m
}

func (sdc *SharedDomainsCommitmentContext) storeCommitmentState(blockNum uint64, rootHash []byte) error {
if sdc.sharedDomains.aggTx == nil {
return fmt.Errorf("store commitment state: AggregatorContext is not initialized")
Expand Down
4 changes: 3 additions & 1 deletion eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ Loop:
skipPostEvaluation := false
var usedGas uint64
var txTasks []*state.TxTask
start := time.Now()
for txIndex := -1; txIndex <= len(txs); txIndex++ {
// Do not oversend, wait for the result heap to go under certain size
txTask := &state.TxTask{
Expand Down Expand Up @@ -605,14 +606,15 @@ Loop:
}

mxExecBlocks.Add(1)

fmt.Println("block exec", time.Since(start))
if shouldGenerateChangesets {
aggTx := executor.tx().(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx)
aggTx.RestrictSubsetFileDeletions(true)
start := time.Now()
if _, err := executor.domains().ComputeCommitment(ctx, true, blockNum, execStage.LogPrefix()); err != nil {
return err
}
fmt.Println("commitment", time.Since(start))
ts += time.Since(start)
aggTx.RestrictSubsetFileDeletions(false)
executor.domains().SavePastChangesetAccumulator(b.Hash(), blockNum, changeset)
Expand Down
Loading