From 12ebd14c562cdef777ba106fc218594ffdc37e0d Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sun, 18 Aug 2024 18:33:45 +0700 Subject: [PATCH] e3: bind ii/domain lru to `_visibleFiles` object (#11652) 10% better throughput and latency of: small eth_getLogs, big eth_getLogs increased limit to 4096 - because can re-use lru objects - without worry on "alloc speed" --- erigon-lib/state/bps_tree.go | 5 +- erigon-lib/state/btree_index.go | 2 +- erigon-lib/state/cache.go | 82 ++++++++++++++++++++++--- erigon-lib/state/domain.go | 29 +++++---- erigon-lib/state/domain_test.go | 14 ++--- erigon-lib/state/inverted_index.go | 49 +++++++++------ erigon-lib/state/inverted_index_test.go | 2 +- rpc/handler.go | 18 +++--- 8 files changed, 145 insertions(+), 56 deletions(-) diff --git a/erigon-lib/state/bps_tree.go b/erigon-lib/state/bps_tree.go index bd9fd0b9836..b3c9f5bbfea 100644 --- a/erigon-lib/state/bps_tree.go +++ b/erigon-lib/state/bps_tree.go @@ -20,6 +20,7 @@ import ( "bytes" "errors" "fmt" + "time" "unsafe" "github.com/c2h5oh/datasize" @@ -134,6 +135,7 @@ type Node struct { } func (b *BpsTree) WarmUp(kv ArchiveGetter) error { + t := time.Now() N := b.offt.Count() if N == 0 { return nil @@ -163,7 +165,8 @@ func (b *BpsTree) WarmUp(kv ArchiveGetter) error { log.Root().Debug("WarmUp finished", "file", kv.FileName(), "M", b.M, "N", N, "cached", fmt.Sprintf("%d %%%.5f", len(b.mx), float64(len(b.mx))/float64(N)*100), - "cacheSize", datasize.ByteSize(cachedBytes).HR(), "fileSize", datasize.ByteSize(kv.Size()).HR()) + "cacheSize", datasize.ByteSize(cachedBytes).HR(), "fileSize", datasize.ByteSize(kv.Size()).HR(), + "took", time.Since(t)) return nil } diff --git a/erigon-lib/state/btree_index.go b/erigon-lib/state/btree_index.go index e3e5e889a97..abb7c0f6290 100644 --- a/erigon-lib/state/btree_index.go +++ b/erigon-lib/state/btree_index.go @@ -50,7 +50,7 @@ const BtreeLogPrefix = "btree" // DefaultBtreeM - amount of keys on leaf of BTree // It will do log2(M) co-located-reads from data file - for binary-search inside leaf -var DefaultBtreeM = uint64(256) +var DefaultBtreeM = uint64(dbg.EnvInt("BT_M", 256)) const DefaultBtreeStartSkip = uint64(4) // defines smallest shard available for scan instead of binsearch diff --git a/erigon-lib/state/cache.go b/erigon-lib/state/cache.go index a3586445e0b..b948346bc8d 100644 --- a/erigon-lib/state/cache.go +++ b/erigon-lib/state/cache.go @@ -2,6 +2,7 @@ package state import ( "fmt" + "sync" "github.com/elastic/go-freelru" "github.com/erigontech/erigon-lib/common/dbg" @@ -28,29 +29,64 @@ type domainGetFromFileCacheItem struct { } var ( - domainGetFromFileCacheLimit = uint32(dbg.EnvInt("D_LRU", 64)) + domainGetFromFileCacheLimit = uint32(dbg.EnvInt("D_LRU", 4096)) domainGetFromFileCacheTrace = dbg.EnvBool("D_LRU_TRACE", false) ) -func NewDomainGetFromFileCache(trace bool) *DomainGetFromFileCache { +func NewDomainGetFromFileCache() *DomainGetFromFileCache { c, err := freelru.New[u128, domainGetFromFileCacheItem](domainGetFromFileCacheLimit, u128noHash) if err != nil { panic(err) } - return &DomainGetFromFileCache{LRU: c, trace: trace || domainGetFromFileCacheTrace} + return &DomainGetFromFileCache{LRU: c, trace: domainGetFromFileCacheTrace} } +func (c *DomainGetFromFileCache) SetTrace(v bool) { c.trace = v } func (c *DomainGetFromFileCache) LogStats(dt kv.Domain) { if c == nil || !c.trace { return } - m := c.Metrics() log.Warn("[dbg] DomainGetFromFileCache", "a", dt.String(), "hit", m.Hits, "total", m.Hits+m.Misses, "Collisions", m.Collisions, "Evictions", m.Evictions, "Inserts", m.Inserts, "limit", domainGetFromFileCacheLimit, "ratio", fmt.Sprintf("%.2f", float64(m.Hits)/float64(m.Hits+m.Misses))) } +func NewDomainGetFromFileCacheAny() any { return NewDomainGetFromFileCache() } +func newDomainVisible(name kv.Domain, files []visibleFile) *domainVisible { + d := &domainVisible{ + name: name, + files: files, + caches: &sync.Pool{New: NewDomainGetFromFileCacheAny}, + } + // Not on hot-path: better pre-alloc here + d.preAlloc() + return d +} +func (v *domainVisible) preAlloc() { + var preAlloc [10]any + for i := 0; i < len(preAlloc); i++ { + preAlloc[i] = v.caches.Get() + } + for i := 0; i < len(preAlloc); i++ { + v.caches.Put(preAlloc[i]) + } +} + +func (v *domainVisible) newGetFromFileCache() *DomainGetFromFileCache { + if v.name == kv.CommitmentDomain { + return nil + } + return v.caches.Get().(*DomainGetFromFileCache) +} +func (v *domainVisible) returnGetFromFileCache(c *DomainGetFromFileCache) { + if c == nil { + return + } + c.LogStats(v.name) + v.caches.Put(c) +} + var ( - iiGetFromFileCacheLimit = uint32(dbg.EnvInt("II_LRU", 128)) + iiGetFromFileCacheLimit = uint32(dbg.EnvInt("II_LRU", 4096)) iiGetFromFileCacheTrace = dbg.EnvBool("II_LRU_TRACE", false) ) @@ -63,13 +99,14 @@ type iiSeekInFilesCacheItem struct { requested, found uint64 } -func NewIISeekInFilesCache(trace bool) *IISeekInFilesCache { +func NewIISeekInFilesCache() *IISeekInFilesCache { c, err := freelru.New[u128, iiSeekInFilesCacheItem](iiGetFromFileCacheLimit, u128noHash) if err != nil { panic(err) } - return &IISeekInFilesCache{LRU: c, trace: trace || iiGetFromFileCacheTrace} + return &IISeekInFilesCache{LRU: c, trace: iiGetFromFileCacheTrace} } +func (c *IISeekInFilesCache) SetTrace(v bool) { c.trace = v } func (c *IISeekInFilesCache) LogStats(fileBaseName string) { if c == nil || !c.trace { return @@ -77,3 +114,34 @@ func (c *IISeekInFilesCache) LogStats(fileBaseName string) { m := c.Metrics() log.Warn("[dbg] IISeekInFilesCache", "a", fileBaseName, "hit", c.hit, "total", c.total, "Collisions", m.Collisions, "Evictions", m.Evictions, "Inserts", m.Inserts, "limit", iiGetFromFileCacheLimit, "ratio", fmt.Sprintf("%.2f", float64(c.hit)/float64(c.total))) } + +func NewIISeekInFilesCacheAny() any { return NewIISeekInFilesCache() } +func newIIVisible(name string, files []visibleFile) *iiVisible { + ii := &iiVisible{ + name: name, + files: files, + caches: &sync.Pool{New: NewIISeekInFilesCacheAny}, + } + // Not on hot-path: better pre-alloc here + ii.preAlloc() + return ii +} +func (v *iiVisible) preAlloc() { + var preAlloc [10]any + for i := 0; i < len(preAlloc); i++ { + preAlloc[i] = v.caches.Get() + } + for i := 0; i < len(preAlloc); i++ { + v.caches.Put(preAlloc[i]) + } +} +func (v *iiVisible) newSeekInFilesCache() *IISeekInFilesCache { + return v.caches.Get().(*IISeekInFilesCache) +} +func (v *iiVisible) returnSeekInFilesCache(c *IISeekInFilesCache) { + if c == nil { + return + } + c.LogStats(v.name) + v.caches.Put(c) +} diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index a3c8e93fe19..ddf23444893 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -88,7 +88,7 @@ type Domain struct { // _visibleFiles - underscore in name means: don't use this field directly, use BeginFilesRo() // underlying array is immutable - means it's ready for zero-copy use - _visibleFiles []visibleFile + _visibleFiles *domainVisible integrityCheck func(name kv.Domain, fromStep, toStep uint64) bool @@ -114,6 +114,12 @@ type domainCfg struct { restrictSubsetFileDeletions bool } +type domainVisible struct { + files []visibleFile + name kv.Domain + caches *sync.Pool +} + func NewDomain(cfg domainCfg, aggregationStep uint64, name kv.Domain, valsTable, indexKeysTable, historyValsTable, indexTable string, integrityCheck func(name kv.Domain, fromStep, toStep uint64) bool, logger log.Logger) (*Domain, error) { if cfg.hist.iiCfg.dirs.SnapDomain == "" { panic("empty `dirs` variable") @@ -132,7 +138,7 @@ func NewDomain(cfg domainCfg, aggregationStep uint64, name kv.Domain, valsTable, integrityCheck: integrityCheck, } - d._visibleFiles = []visibleFile{} + d._visibleFiles = newDomainVisible(d.name, []visibleFile{}) var err error if d.History, err = NewHistory(cfg.hist, aggregationStep, name.String(), indexKeysTable, indexTable, historyValsTable, nil, logger); err != nil { @@ -439,7 +445,7 @@ func (d *Domain) closeWhatNotInList(fNames []string) { } func (d *Domain) reCalcVisibleFiles() { - d._visibleFiles = calcVisibleFiles(d.dirtyFiles, d.indexList, false) + d._visibleFiles = newDomainVisible(d.name, calcVisibleFiles(d.dirtyFiles, d.indexList, false)) d.History.reCalcVisibleFiles() } @@ -852,7 +858,7 @@ func (d *Domain) collectFilesStats() (datsz, idxsz, files uint64) { } func (d *Domain) BeginFilesRo() *DomainRoTx { - files := d._visibleFiles + files := d._visibleFiles.files for i := 0; i < len(files); i++ { if !files[i].src.frozen { files[i].src.refcount.Add(1) @@ -1392,10 +1398,11 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt } hi, lo := dt.ht.iit.hashKey(filekey) - if dt.name != kv.CommitmentDomain { - if dt.getFromFileCache == nil { - dt.getFromFileCache = NewDomainGetFromFileCache(false) - } + + if dt.getFromFileCache == nil { + dt.getFromFileCache = dt.d._visibleFiles.newGetFromFileCache() + } + if dt.getFromFileCache != nil { cv, ok := dt.getFromFileCache.Get(u128{hi: hi, lo: lo}) if ok { return cv.v, true, dt.files[cv.lvl].startTxNum, dt.files[cv.lvl].endTxNum, nil @@ -1436,7 +1443,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt fmt.Printf("GetLatest(%s, %x) -> found in file %s\n", dt.name.String(), filekey, dt.files[i].src.decompressor.FileName()) } - if dt.name != kv.CommitmentDomain { + if dt.getFromFileCache != nil { dt.getFromFileCache.Add(u128{hi: hi, lo: lo}, domainGetFromFileCacheItem{lvl: uint8(i), v: v}) } return v, true, dt.files[i].startTxNum, dt.files[i].endTxNum, nil @@ -1445,7 +1452,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt fmt.Printf("GetLatest(%s, %x) -> not found in %d files\n", dt.name.String(), filekey, len(dt.files)) } - if dt.name != kv.CommitmentDomain { + if dt.getFromFileCache != nil { dt.getFromFileCache.Add(u128{hi: hi, lo: lo}, domainGetFromFileCacheItem{lvl: 0, v: nil}) } return nil, false, 0, 0, nil @@ -1501,7 +1508,7 @@ func (dt *DomainRoTx) Close() { } dt.ht.Close() - dt.getFromFileCache.LogStats(dt.name) + dt.d._visibleFiles.returnGetFromFileCache(dt.getFromFileCache) } func (dt *DomainRoTx) statelessGetter(i int) ArchiveGetter { diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index 9fc398c2114..7f252c66308 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -106,7 +106,7 @@ func TestDomain_OpenFolder(t *testing.T) { collateAndMerge(t, db, nil, d, txs) - list := d._visibleFiles + list := d._visibleFiles.files require.NotEmpty(t, list) ff := list[len(list)-1] fn := ff.src.decompressor.FilePath() @@ -858,8 +858,8 @@ func TestDomain_OpenFilesWithDeletions(t *testing.T) { require.NoError(t, err) run1Doms, run1Hist := make([]string, 0), make([]string, 0) - for i := 0; i < len(dom._visibleFiles); i++ { - run1Doms = append(run1Doms, dom._visibleFiles[i].src.decompressor.FileName()) + for i := 0; i < len(dom._visibleFiles.files); i++ { + run1Doms = append(run1Doms, dom._visibleFiles.files[i].src.decompressor.FileName()) // should be equal length run1Hist = append(run1Hist, dom.History._visibleFiles[i].src.decompressor.FileName()) } @@ -878,12 +878,12 @@ func TestDomain_OpenFilesWithDeletions(t *testing.T) { require.NoError(t, err) // domain files for same range should not be available so lengths should match - require.Len(t, dom._visibleFiles, len(run1Doms)-len(removedHist)) - require.Len(t, dom.History._visibleFiles, len(dom._visibleFiles)) + require.Len(t, dom._visibleFiles.files, len(run1Doms)-len(removedHist)) + require.Len(t, dom.History._visibleFiles, len(dom._visibleFiles.files)) require.Len(t, dom.History._visibleFiles, len(run1Hist)-len(removedHist)) - for i := 0; i < len(dom._visibleFiles); i++ { - require.EqualValuesf(t, run1Doms[i], dom._visibleFiles[i].src.decompressor.FileName(), "kv i=%d", i) + for i := 0; i < len(dom._visibleFiles.files); i++ { + require.EqualValuesf(t, run1Doms[i], dom._visibleFiles.files[i].src.decompressor.FileName(), "kv i=%d", i) require.EqualValuesf(t, run1Hist[i], dom.History._visibleFiles[i].src.decompressor.FileName(), " v i=%d", i) } diff --git a/erigon-lib/state/inverted_index.go b/erigon-lib/state/inverted_index.go index 1248f842e07..d4501195a71 100644 --- a/erigon-lib/state/inverted_index.go +++ b/erigon-lib/state/inverted_index.go @@ -71,7 +71,7 @@ type InvertedIndex struct { // _visibleFiles - underscore in name means: don't use this field directly, use BeginFilesRo() // underlying array is immutable - means it's ready for zero-copy use - _visibleFiles []visibleFile + _visibleFiles *iiVisible indexKeysTable string // txnNum_u64 -> key (k+auto_increment) indexTable string // k -> txnNum_u64 , Needs to be table with DupSort @@ -96,6 +96,11 @@ type iiCfg struct { dirs datadir.Dirs db kv.RoDB // global db pointer. mostly for background warmup. } +type iiVisible struct { + files []visibleFile + name string + caches *sync.Pool +} func NewInvertedIndex(cfg iiCfg, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, integrityCheck func(fromStep uint64, toStep uint64) bool, logger log.Logger) (*InvertedIndex, error) { if cfg.dirs.SnapDomain == "" { @@ -115,7 +120,7 @@ func NewInvertedIndex(cfg iiCfg, aggregationStep uint64, filenameBase, indexKeys } ii.indexList = withHashMap - ii._visibleFiles = []visibleFile{} + ii._visibleFiles = newIIVisible(ii.filenameBase, []visibleFile{}) return &ii, nil } @@ -225,7 +230,7 @@ var ( ) func (ii *InvertedIndex) reCalcVisibleFiles() { - ii._visibleFiles = calcVisibleFiles(ii.dirtyFiles, ii.indexList, false) + ii._visibleFiles = newIIVisible(ii.filenameBase, calcVisibleFiles(ii.dirtyFiles, ii.indexList, false)) } func (ii *InvertedIndex) missedAccessors() (l []*filesItem) { @@ -470,7 +475,7 @@ func (w *invertedIndexBufferedWriter) add(key, indexKey []byte) error { } func (ii *InvertedIndex) BeginFilesRo() *InvertedIndexRoTx { - files := ii._visibleFiles + files := ii._visibleFiles.files for i := 0; i < len(files); i++ { if !files[i].src.frozen { files[i].src.refcount.Add(1) @@ -506,7 +511,7 @@ func (iit *InvertedIndexRoTx) Close() { r.Close() } - iit.seekInFilesCache.LogStats(iit.ii.filenameBase) + iit.ii._visibleFiles.returnSeekInFilesCache(iit.seekInFilesCache) } type MergeRange struct { @@ -577,18 +582,20 @@ func (iit *InvertedIndexRoTx) seekInFiles(key []byte, txNum uint64) (found bool, hi, lo := iit.hashKey(key) if iit.seekInFilesCache == nil { - iit.seekInFilesCache = NewIISeekInFilesCache(false) - } - - iit.seekInFilesCache.total++ - fromCache, ok := iit.seekInFilesCache.Get(u128{hi: hi, lo: lo}) - if ok && fromCache.requested <= txNum { - if txNum <= fromCache.found { - iit.seekInFilesCache.hit++ - return true, fromCache.found - } else if fromCache.found == 0 { - iit.seekInFilesCache.hit++ - return false, 0 + iit.seekInFilesCache = iit.ii._visibleFiles.newSeekInFilesCache() + } + + if iit.seekInFilesCache != nil { + iit.seekInFilesCache.total++ + fromCache, ok := iit.seekInFilesCache.Get(u128{hi: hi, lo: lo}) + if ok && fromCache.requested <= txNum { + if txNum <= fromCache.found { + iit.seekInFilesCache.hit++ + return true, fromCache.found + } else if fromCache.found == 0 { + iit.seekInFilesCache.hit++ + return false, 0 + } } } @@ -611,12 +618,16 @@ func (iit *InvertedIndexRoTx) seekInFiles(key []byte, txNum uint64) (found bool, equalOrHigherTxNum, found = eliasfano32.Seek(eliasVal, txNum) if found { - iit.seekInFilesCache.Add(u128{hi: hi, lo: lo}, iiSeekInFilesCacheItem{requested: txNum, found: equalOrHigherTxNum}) + if iit.seekInFilesCache != nil { + iit.seekInFilesCache.Add(u128{hi: hi, lo: lo}, iiSeekInFilesCacheItem{requested: txNum, found: equalOrHigherTxNum}) + } return true, equalOrHigherTxNum } } - iit.seekInFilesCache.Add(u128{hi: hi, lo: lo}, iiSeekInFilesCacheItem{requested: txNum, found: 0}) + if iit.seekInFilesCache != nil { + iit.seekInFilesCache.Add(u128{hi: hi, lo: lo}, iiSeekInFilesCacheItem{requested: txNum, found: 0}) + } return false, 0 } diff --git a/erigon-lib/state/inverted_index_test.go b/erigon-lib/state/inverted_index_test.go index ebfe67319bb..b629c38bd03 100644 --- a/erigon-lib/state/inverted_index_test.go +++ b/erigon-lib/state/inverted_index_test.go @@ -713,7 +713,7 @@ func TestInvIndex_OpenFolder(t *testing.T) { mergeInverted(t, db, ii, txs) - list := ii._visibleFiles + list := ii._visibleFiles.files require.NotEmpty(t, list) ff := list[len(list)-1] fn := ff.src.decompressor.FilePath() diff --git a/rpc/handler.go b/rpc/handler.go index 0ae3dbc1b24..4cadcb39281 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -398,14 +398,11 @@ func (h *handler) handleResponse(msg *jsonrpcMessage) { // handleCallMsg executes a call message and returns the answer. func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage, stream *jsoniter.Stream) *jsonrpcMessage { - start := time.Now() switch { case msg.isNotification(): h.handleCall(ctx, msg, stream) if h.traceRequests { - h.logger.Info("[rpc] served", "t", time.Since(start), "method", msg.Method, "params", string(msg.Params)) - } else { - h.logger.Trace("[rpc] served", "t", time.Since(start), "method", msg.Method, "params", string(msg.Params)) + h.logger.Info("[rpc] served", "method", msg.Method, "params", string(msg.Params)) } return nil case msg.isCall(): @@ -420,6 +417,11 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage, stream *json } } + var start time.Time + if doSlowLog { + start = time.Now() + } + resp := h.handleCall(ctx, msg, stream) if doSlowLog { @@ -431,17 +433,15 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage, stream *json if resp != nil && resp.Error != nil { if resp.Error.Data != nil { - h.logger.Warn("[rpc] served", "method", msg.Method, "reqid", idForLog(msg.ID), "t", time.Since(start), + h.logger.Warn("[rpc] served", "method", msg.Method, "reqid", idForLog(msg.ID), "err", resp.Error.Message, "errdata", resp.Error.Data) } else { - h.logger.Warn("[rpc] served", "method", msg.Method, "reqid", idForLog(msg.ID), "t", time.Since(start), + h.logger.Warn("[rpc] served", "method", msg.Method, "reqid", idForLog(msg.ID), "err", resp.Error.Message) } } if h.traceRequests { - h.logger.Info("Served", "t", time.Since(start), "method", msg.Method, "reqid", idForLog(msg.ID), "params", string(msg.Params)) - } else { - h.logger.Trace("Served", "t", time.Since(start), "method", msg.Method, "reqid", idForLog(msg.ID), "params", string(msg.Params)) + h.logger.Info("Served", "method", msg.Method, "reqid", idForLog(msg.ID), "params", string(msg.Params)) } return resp