Skip to content

Commit

Permalink
e3: bind ii/domain lru to _visibleFiles object (#11652)
Browse files Browse the repository at this point in the history
10% better throughput and latency of: small eth_getLogs, big eth_getLogs
increased limit to 4096 - because can re-use lru objects - without worry
on "alloc speed"
  • Loading branch information
AskAlexSharov authored Aug 18, 2024
1 parent c78450c commit 12ebd14
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 56 deletions.
5 changes: 4 additions & 1 deletion erigon-lib/state/bps_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"
"time"
"unsafe"

"github.com/c2h5oh/datasize"
Expand Down Expand Up @@ -134,6 +135,7 @@ type Node struct {
}

func (b *BpsTree) WarmUp(kv ArchiveGetter) error {
t := time.Now()
N := b.offt.Count()
if N == 0 {
return nil
Expand Down Expand Up @@ -163,7 +165,8 @@ func (b *BpsTree) WarmUp(kv ArchiveGetter) error {

log.Root().Debug("WarmUp finished", "file", kv.FileName(), "M", b.M, "N", N,
"cached", fmt.Sprintf("%d %%%.5f", len(b.mx), float64(len(b.mx))/float64(N)*100),
"cacheSize", datasize.ByteSize(cachedBytes).HR(), "fileSize", datasize.ByteSize(kv.Size()).HR())
"cacheSize", datasize.ByteSize(cachedBytes).HR(), "fileSize", datasize.ByteSize(kv.Size()).HR(),
"took", time.Since(t))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/state/btree_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const BtreeLogPrefix = "btree"

// DefaultBtreeM - amount of keys on leaf of BTree
// It will do log2(M) co-located-reads from data file - for binary-search inside leaf
var DefaultBtreeM = uint64(256)
var DefaultBtreeM = uint64(dbg.EnvInt("BT_M", 256))

const DefaultBtreeStartSkip = uint64(4) // defines smallest shard available for scan instead of binsearch

Expand Down
82 changes: 75 additions & 7 deletions erigon-lib/state/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"fmt"
"sync"

"github.com/elastic/go-freelru"
"github.com/erigontech/erigon-lib/common/dbg"
Expand All @@ -28,29 +29,64 @@ type domainGetFromFileCacheItem struct {
}

var (
domainGetFromFileCacheLimit = uint32(dbg.EnvInt("D_LRU", 64))
domainGetFromFileCacheLimit = uint32(dbg.EnvInt("D_LRU", 4096))
domainGetFromFileCacheTrace = dbg.EnvBool("D_LRU_TRACE", false)
)

func NewDomainGetFromFileCache(trace bool) *DomainGetFromFileCache {
func NewDomainGetFromFileCache() *DomainGetFromFileCache {
c, err := freelru.New[u128, domainGetFromFileCacheItem](domainGetFromFileCacheLimit, u128noHash)
if err != nil {
panic(err)
}
return &DomainGetFromFileCache{LRU: c, trace: trace || domainGetFromFileCacheTrace}
return &DomainGetFromFileCache{LRU: c, trace: domainGetFromFileCacheTrace}
}

func (c *DomainGetFromFileCache) SetTrace(v bool) { c.trace = v }
func (c *DomainGetFromFileCache) LogStats(dt kv.Domain) {
if c == nil || !c.trace {
return
}

m := c.Metrics()
log.Warn("[dbg] DomainGetFromFileCache", "a", dt.String(), "hit", m.Hits, "total", m.Hits+m.Misses, "Collisions", m.Collisions, "Evictions", m.Evictions, "Inserts", m.Inserts, "limit", domainGetFromFileCacheLimit, "ratio", fmt.Sprintf("%.2f", float64(m.Hits)/float64(m.Hits+m.Misses)))
}

func NewDomainGetFromFileCacheAny() any { return NewDomainGetFromFileCache() }
func newDomainVisible(name kv.Domain, files []visibleFile) *domainVisible {
d := &domainVisible{
name: name,
files: files,
caches: &sync.Pool{New: NewDomainGetFromFileCacheAny},
}
// Not on hot-path: better pre-alloc here
d.preAlloc()
return d
}
func (v *domainVisible) preAlloc() {
var preAlloc [10]any
for i := 0; i < len(preAlloc); i++ {
preAlloc[i] = v.caches.Get()
}
for i := 0; i < len(preAlloc); i++ {
v.caches.Put(preAlloc[i])
}
}

func (v *domainVisible) newGetFromFileCache() *DomainGetFromFileCache {
if v.name == kv.CommitmentDomain {
return nil
}
return v.caches.Get().(*DomainGetFromFileCache)
}
func (v *domainVisible) returnGetFromFileCache(c *DomainGetFromFileCache) {
if c == nil {
return
}
c.LogStats(v.name)
v.caches.Put(c)
}

var (
iiGetFromFileCacheLimit = uint32(dbg.EnvInt("II_LRU", 128))
iiGetFromFileCacheLimit = uint32(dbg.EnvInt("II_LRU", 4096))
iiGetFromFileCacheTrace = dbg.EnvBool("II_LRU_TRACE", false)
)

Expand All @@ -63,17 +99,49 @@ type iiSeekInFilesCacheItem struct {
requested, found uint64
}

func NewIISeekInFilesCache(trace bool) *IISeekInFilesCache {
func NewIISeekInFilesCache() *IISeekInFilesCache {
c, err := freelru.New[u128, iiSeekInFilesCacheItem](iiGetFromFileCacheLimit, u128noHash)
if err != nil {
panic(err)
}
return &IISeekInFilesCache{LRU: c, trace: trace || iiGetFromFileCacheTrace}
return &IISeekInFilesCache{LRU: c, trace: iiGetFromFileCacheTrace}
}
func (c *IISeekInFilesCache) SetTrace(v bool) { c.trace = v }
func (c *IISeekInFilesCache) LogStats(fileBaseName string) {
if c == nil || !c.trace {
return
}
m := c.Metrics()
log.Warn("[dbg] IISeekInFilesCache", "a", fileBaseName, "hit", c.hit, "total", c.total, "Collisions", m.Collisions, "Evictions", m.Evictions, "Inserts", m.Inserts, "limit", iiGetFromFileCacheLimit, "ratio", fmt.Sprintf("%.2f", float64(c.hit)/float64(c.total)))
}

func NewIISeekInFilesCacheAny() any { return NewIISeekInFilesCache() }
func newIIVisible(name string, files []visibleFile) *iiVisible {
ii := &iiVisible{
name: name,
files: files,
caches: &sync.Pool{New: NewIISeekInFilesCacheAny},
}
// Not on hot-path: better pre-alloc here
ii.preAlloc()
return ii
}
func (v *iiVisible) preAlloc() {
var preAlloc [10]any
for i := 0; i < len(preAlloc); i++ {
preAlloc[i] = v.caches.Get()
}
for i := 0; i < len(preAlloc); i++ {
v.caches.Put(preAlloc[i])
}
}
func (v *iiVisible) newSeekInFilesCache() *IISeekInFilesCache {
return v.caches.Get().(*IISeekInFilesCache)
}
func (v *iiVisible) returnSeekInFilesCache(c *IISeekInFilesCache) {
if c == nil {
return
}
c.LogStats(v.name)
v.caches.Put(c)
}
29 changes: 18 additions & 11 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Domain struct {

// _visibleFiles - underscore in name means: don't use this field directly, use BeginFilesRo()
// underlying array is immutable - means it's ready for zero-copy use
_visibleFiles []visibleFile
_visibleFiles *domainVisible

integrityCheck func(name kv.Domain, fromStep, toStep uint64) bool

Expand All @@ -114,6 +114,12 @@ type domainCfg struct {
restrictSubsetFileDeletions bool
}

type domainVisible struct {
files []visibleFile
name kv.Domain
caches *sync.Pool
}

func NewDomain(cfg domainCfg, aggregationStep uint64, name kv.Domain, valsTable, indexKeysTable, historyValsTable, indexTable string, integrityCheck func(name kv.Domain, fromStep, toStep uint64) bool, logger log.Logger) (*Domain, error) {
if cfg.hist.iiCfg.dirs.SnapDomain == "" {
panic("empty `dirs` variable")
Expand All @@ -132,7 +138,7 @@ func NewDomain(cfg domainCfg, aggregationStep uint64, name kv.Domain, valsTable,
integrityCheck: integrityCheck,
}

d._visibleFiles = []visibleFile{}
d._visibleFiles = newDomainVisible(d.name, []visibleFile{})

var err error
if d.History, err = NewHistory(cfg.hist, aggregationStep, name.String(), indexKeysTable, indexTable, historyValsTable, nil, logger); err != nil {
Expand Down Expand Up @@ -439,7 +445,7 @@ func (d *Domain) closeWhatNotInList(fNames []string) {
}

func (d *Domain) reCalcVisibleFiles() {
d._visibleFiles = calcVisibleFiles(d.dirtyFiles, d.indexList, false)
d._visibleFiles = newDomainVisible(d.name, calcVisibleFiles(d.dirtyFiles, d.indexList, false))
d.History.reCalcVisibleFiles()
}

Expand Down Expand Up @@ -852,7 +858,7 @@ func (d *Domain) collectFilesStats() (datsz, idxsz, files uint64) {
}

func (d *Domain) BeginFilesRo() *DomainRoTx {
files := d._visibleFiles
files := d._visibleFiles.files
for i := 0; i < len(files); i++ {
if !files[i].src.frozen {
files[i].src.refcount.Add(1)
Expand Down Expand Up @@ -1392,10 +1398,11 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt
}

hi, lo := dt.ht.iit.hashKey(filekey)
if dt.name != kv.CommitmentDomain {
if dt.getFromFileCache == nil {
dt.getFromFileCache = NewDomainGetFromFileCache(false)
}

if dt.getFromFileCache == nil {
dt.getFromFileCache = dt.d._visibleFiles.newGetFromFileCache()
}
if dt.getFromFileCache != nil {
cv, ok := dt.getFromFileCache.Get(u128{hi: hi, lo: lo})
if ok {
return cv.v, true, dt.files[cv.lvl].startTxNum, dt.files[cv.lvl].endTxNum, nil
Expand Down Expand Up @@ -1436,7 +1443,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt
fmt.Printf("GetLatest(%s, %x) -> found in file %s\n", dt.name.String(), filekey, dt.files[i].src.decompressor.FileName())
}

if dt.name != kv.CommitmentDomain {
if dt.getFromFileCache != nil {
dt.getFromFileCache.Add(u128{hi: hi, lo: lo}, domainGetFromFileCacheItem{lvl: uint8(i), v: v})
}
return v, true, dt.files[i].startTxNum, dt.files[i].endTxNum, nil
Expand All @@ -1445,7 +1452,7 @@ func (dt *DomainRoTx) getFromFiles(filekey []byte) (v []byte, found bool, fileSt
fmt.Printf("GetLatest(%s, %x) -> not found in %d files\n", dt.name.String(), filekey, len(dt.files))
}

if dt.name != kv.CommitmentDomain {
if dt.getFromFileCache != nil {
dt.getFromFileCache.Add(u128{hi: hi, lo: lo}, domainGetFromFileCacheItem{lvl: 0, v: nil})
}
return nil, false, 0, 0, nil
Expand Down Expand Up @@ -1501,7 +1508,7 @@ func (dt *DomainRoTx) Close() {
}
dt.ht.Close()

dt.getFromFileCache.LogStats(dt.name)
dt.d._visibleFiles.returnGetFromFileCache(dt.getFromFileCache)
}

func (dt *DomainRoTx) statelessGetter(i int) ArchiveGetter {
Expand Down
14 changes: 7 additions & 7 deletions erigon-lib/state/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestDomain_OpenFolder(t *testing.T) {

collateAndMerge(t, db, nil, d, txs)

list := d._visibleFiles
list := d._visibleFiles.files
require.NotEmpty(t, list)
ff := list[len(list)-1]
fn := ff.src.decompressor.FilePath()
Expand Down Expand Up @@ -858,8 +858,8 @@ func TestDomain_OpenFilesWithDeletions(t *testing.T) {
require.NoError(t, err)

run1Doms, run1Hist := make([]string, 0), make([]string, 0)
for i := 0; i < len(dom._visibleFiles); i++ {
run1Doms = append(run1Doms, dom._visibleFiles[i].src.decompressor.FileName())
for i := 0; i < len(dom._visibleFiles.files); i++ {
run1Doms = append(run1Doms, dom._visibleFiles.files[i].src.decompressor.FileName())
// should be equal length
run1Hist = append(run1Hist, dom.History._visibleFiles[i].src.decompressor.FileName())
}
Expand All @@ -878,12 +878,12 @@ func TestDomain_OpenFilesWithDeletions(t *testing.T) {
require.NoError(t, err)

// domain files for same range should not be available so lengths should match
require.Len(t, dom._visibleFiles, len(run1Doms)-len(removedHist))
require.Len(t, dom.History._visibleFiles, len(dom._visibleFiles))
require.Len(t, dom._visibleFiles.files, len(run1Doms)-len(removedHist))
require.Len(t, dom.History._visibleFiles, len(dom._visibleFiles.files))
require.Len(t, dom.History._visibleFiles, len(run1Hist)-len(removedHist))

for i := 0; i < len(dom._visibleFiles); i++ {
require.EqualValuesf(t, run1Doms[i], dom._visibleFiles[i].src.decompressor.FileName(), "kv i=%d", i)
for i := 0; i < len(dom._visibleFiles.files); i++ {
require.EqualValuesf(t, run1Doms[i], dom._visibleFiles.files[i].src.decompressor.FileName(), "kv i=%d", i)
require.EqualValuesf(t, run1Hist[i], dom.History._visibleFiles[i].src.decompressor.FileName(), " v i=%d", i)
}

Expand Down
Loading

0 comments on commit 12ebd14

Please sign in to comment.