From 7df0561130cc507cbfa71188a38e853b6459fa94 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 31 Aug 2024 11:23:28 +0700 Subject: [PATCH] compression of storage.kv (#11712) Co-authored-by: Mark Holt --- cmd/integration/commands/flags.go | 5 +- cmd/integration/commands/stages.go | 1 + erigon-lib/state/aggregator.go | 107 +++++--- erigon-lib/state/aggregator_test.go | 1 + erigon-lib/state/archive.go | 15 + erigon-lib/state/bps_tree.go | 28 +- erigon-lib/state/btree_index.go | 9 +- erigon-lib/state/btree_index_test.go | 7 +- erigon-lib/state/domain.go | 30 +- erigon-lib/state/domain_committed.go | 42 ++- erigon-lib/state/files_item.go | 47 ++++ erigon-lib/state/merge.go | 4 +- eth/stagedsync/bor_heimdall_shared.go | 4 +- migrations/commitment_recompress.go | 258 ++++++++++++++++++ .../{commitment.go => commitment_squeeze.go} | 26 +- migrations/migrations.go | 1 + node/node.go | 4 +- 17 files changed, 507 insertions(+), 82 deletions(-) create mode 100644 migrations/commitment_recompress.go rename migrations/{commitment.go => commitment_squeeze.go} (74%) diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index 1a70a4280b1..7237b17078d 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -37,7 +37,6 @@ var ( bucket string datadirCli, toChaindata string migration string - squeezeCommitmentFiles bool integrityFast, integritySlow bool file string HeimdallURL string @@ -57,9 +56,10 @@ var ( startTxNum uint64 traceFromTx uint64 - _forceSetHistoryV3 bool workers, reconWorkers uint64 dbWriteMap bool + + squeezeCommitmentFiles, recompressCommitmentFiles bool ) func must(err error) { @@ -133,6 +133,7 @@ func withBucket(cmd *cobra.Command) { func withSqueezeCommitmentFiles(cmd *cobra.Command) { cmd.Flags().BoolVar(&squeezeCommitmentFiles, "squeeze", false, "allow to squeeze commitment files on start") + cmd.Flags().BoolVar(&recompressCommitmentFiles, "recompress", false, "allow to recompress existing .kv files") } func withDataDir2(cmd *cobra.Command) { diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index ec9455b49cf..a305825ab82 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -412,6 +412,7 @@ var cmdRunMigrations = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { logger := debug.SetupCobra(cmd, "integration") migrations.EnableSqueezeCommitmentFiles = squeezeCommitmentFiles + migrations.EnableRecompressCommitmentFiles = recompressCommitmentFiles //non-accede and exclusive mode - to apply create new tables if need. cfg := dbCfg(kv.ChainDB, chaindata).Flags(func(u uint) uint { return u &^ mdbx.Accede }).Exclusive() db, err := openDB(cfg, true, logger) diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 58075d0b01a..a0cd838e84c 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -173,7 +173,7 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 withLocalityIndex: false, withExistenceIndex: false, compression: CompressNone, historyLargeValues: false, }, restrictSubsetFileDeletions: a.commitmentValuesTransform, - compress: CompressNone, + compress: CompressKeys, } if a.d[kv.StorageDomain], err = NewDomain(cfg, aggregationStep, kv.StorageDomain, kv.TblStorageVals, kv.TblStorageHistoryKeys, kv.TblStorageHistoryVals, kv.TblStorageIdx, integrityCheck, logger); err != nil { return nil, err @@ -1490,7 +1490,7 @@ func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) RangesV3 { // SqueezeCommitmentFiles should be called only when NO EXECUTION is running. // Removes commitment files and suppose following aggregator shutdown and restart (to integrate new files and rebuild indexes) -func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { +func (ac *AggregatorRoTx) SqueezeCommitmentFiles(mergedAgg *AggregatorRoTx) error { if !ac.a.commitmentValuesTransform { return nil } @@ -1500,9 +1500,50 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { storage := ac.d[kv.StorageDomain] // oh, again accessing domain.files directly, again and again.. - accountFiles := accounts.d.dirtyFiles.Items() - storageFiles := storage.d.dirtyFiles.Items() - commitFiles := commitment.d.dirtyFiles.Items() + mergedAccountFiles := mergedAgg.d[kv.AccountsDomain].d.dirtyFiles.Items() + mergedStorageFiles := mergedAgg.d[kv.StorageDomain].d.dirtyFiles.Items() + mergedCommitFiles := mergedAgg.d[kv.CommitmentDomain].d.dirtyFiles.Items() + + for _, f := range accounts.files { + f.src.decompressor.EnableMadvNormal() + } + for _, f := range mergedAccountFiles { + f.decompressor.EnableMadvNormal() + } + for _, f := range storage.files { + f.src.decompressor.EnableMadvNormal() + } + for _, f := range mergedStorageFiles { + f.decompressor.EnableMadvNormal() + } + for _, f := range commitment.files { + f.src.decompressor.EnableMadvNormal() + } + for _, f := range mergedCommitFiles { + f.decompressor.EnableMadvNormal() + } + defer func() { + for _, f := range accounts.files { + f.src.decompressor.DisableReadAhead() + } + for _, f := range mergedAccountFiles { + f.decompressor.DisableReadAhead() + } + for _, f := range storage.files { + f.src.decompressor.DisableReadAhead() + } + for _, f := range mergedStorageFiles { + f.decompressor.DisableReadAhead() + } + for _, f := range commitment.files { + f.src.decompressor.DisableReadAhead() + } + for _, f := range mergedCommitFiles { + f.decompressor.DisableReadAhead() + } + }() + + log.Info("[sqeeze_migration] see target files", "acc", len(mergedAccountFiles), "st", len(mergedStorageFiles), "com", len(mergedCommitFiles)) getSizeDelta := func(a, b string) (datasize.ByteSize, float32, error) { ai, err := os.Stat(a) @@ -1527,42 +1568,45 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { logEvery := time.NewTicker(30 * time.Second) defer logEvery.Stop() - for ci := 0; ci < len(commitFiles); ci++ { - cf := commitFiles[ci] - for ai = 0; ai < len(accountFiles); ai++ { - if accountFiles[ai].startTxNum == cf.startTxNum && accountFiles[ai].endTxNum == cf.endTxNum { + for ci := 0; ci < len(mergedCommitFiles); ci++ { + cf := mergedCommitFiles[ci] + for ai = 0; ai < len(mergedAccountFiles); ai++ { + if mergedAccountFiles[ai].startTxNum == cf.startTxNum && mergedAccountFiles[ai].endTxNum == cf.endTxNum { break } } - for si = 0; si < len(storageFiles); si++ { - if storageFiles[si].startTxNum == cf.startTxNum && storageFiles[si].endTxNum == cf.endTxNum { + for si = 0; si < len(mergedStorageFiles); si++ { + if mergedStorageFiles[si].startTxNum == cf.startTxNum && mergedStorageFiles[si].endTxNum == cf.endTxNum { break } } - if ai == len(accountFiles) || si == len(storageFiles) { - ac.a.logger.Info("SqueezeCommitmentFiles: commitment file has no corresponding account or storage file", "commitment", cf.decompressor.FileName()) + if ai == len(mergedAccountFiles) || si == len(mergedStorageFiles) { + ac.a.logger.Info("[sqeeze_migration] commitment file has no corresponding account or storage file", "commitment", cf.decompressor.FileName()) continue } - af, sf := accountFiles[ai], storageFiles[si] err := func() error { - ac.a.logger.Info("SqueezeCommitmentFiles: file start", "original", cf.decompressor.FileName(), - "progress", fmt.Sprintf("%d/%d", ci+1, len(accountFiles))) + af, sf := mergedAccountFiles[ai], mergedStorageFiles[si] + + steps := cf.endTxNum/ac.a.aggregationStep - cf.startTxNum/ac.a.aggregationStep + compression := commitment.d.compression + if steps < DomainMinStepsToCompress { + compression = CompressNone + } + ac.a.logger.Info("[sqeeze_migration] file start", "original", cf.decompressor.FileName(), + "progress", fmt.Sprintf("%d/%d", ci+1, len(mergedAccountFiles)), "compress_cfg", commitment.d.compressCfg, "compress", compression) originalPath := cf.decompressor.FilePath() squeezedTmpPath := originalPath + sqExt + ".tmp" squeezedCompr, err := seg.NewCompressor(context.Background(), "squeeze", squeezedTmpPath, ac.a.dirs.Tmp, - commitment.d.compressCfg, log.LvlTrace, commitment.d.logger) - + commitment.d.compressCfg, log.LvlInfo, commitment.d.logger) if err != nil { return err } defer squeezedCompr.Close() - cf.decompressor.EnableReadAhead() - defer cf.decompressor.DisableReadAhead() - reader := NewArchiveGetter(cf.decompressor.MakeGetter(), commitment.d.compression) + reader := NewArchiveGetter(cf.decompressor.MakeGetter(), compression) reader.Reset(0) writer := NewArchiveWriter(squeezedCompr, commitment.d.compression) @@ -1573,9 +1617,10 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { } i := 0 + var k, v []byte for reader.HasNext() { - k, _ := reader.Next(nil) - v, _ := reader.Next(nil) + k, _ = reader.Next(k[:0]) + v, _ = reader.Next(v[:0]) i += 2 if k == nil { @@ -1598,7 +1643,7 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { select { case <-logEvery.C: - ac.a.logger.Info("SqueezeCommitmentFiles", "file", cf.decompressor.FileName(), "k", fmt.Sprintf("%x", k), + ac.a.logger.Info("[sqeeze_migration]", "file", cf.decompressor.FileName(), "k", fmt.Sprintf("%x", k), "progress", fmt.Sprintf("%d/%d", i, cf.decompressor.Count())) default: } @@ -1621,7 +1666,7 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { } sizeDelta += delta - ac.a.logger.Info("SqueezeCommitmentFiles: file done", "original", filepath.Base(originalPath), + ac.a.logger.Info("[sqeeze_migration] file done", "original", filepath.Base(originalPath), "sizeDelta", fmt.Sprintf("%s (%.1f%%)", delta.HR(), deltaP)) fromStep, toStep := af.startTxNum/ac.a.StepSize(), af.endTxNum/ac.a.StepSize() @@ -1641,23 +1686,23 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { } } - ac.a.logger.Info("SqueezeCommitmentFiles: squeezed files has been produced, removing obsolete files", - "toRemove", len(obsoleteFiles), "processed", fmt.Sprintf("%d/%d", processedFiles, len(commitFiles))) + ac.a.logger.Info("[sqeeze_migration] squeezed files has been produced, removing obsolete files", + "toRemove", len(obsoleteFiles), "processed", fmt.Sprintf("%d/%d", processedFiles, len(mergedCommitFiles))) for _, path := range obsoleteFiles { if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { return err } - ac.a.logger.Debug("SqueezeCommitmentFiles: obsolete file removal", "path", path) + ac.a.logger.Debug("[sqeeze_migration] obsolete file removal", "path", path) } - ac.a.logger.Info("SqueezeCommitmentFiles: indices removed, renaming temporal files ") + ac.a.logger.Info("[sqeeze_migration] indices removed, renaming temporal files ") for _, path := range temporalFiles { if err := os.Rename(path, strings.TrimSuffix(path, sqExt)); err != nil { return err } - ac.a.logger.Debug("SqueezeCommitmentFiles: temporal file renaming", "path", path) + ac.a.logger.Debug("[sqeeze_migration] temporal file renaming", "path", path) } - ac.a.logger.Info("SqueezeCommitmentFiles: done", "sizeDelta", sizeDelta.HR(), "files", len(accountFiles)) + ac.a.logger.Info("[sqeeze_migration] done", "sizeDelta", sizeDelta.HR(), "files", len(mergedAccountFiles)) return nil } diff --git a/erigon-lib/state/aggregator_test.go b/erigon-lib/state/aggregator_test.go index b77bf239179..e74c1a119a3 100644 --- a/erigon-lib/state/aggregator_test.go +++ b/erigon-lib/state/aggregator_test.go @@ -61,6 +61,7 @@ func TestAggregatorV3_Merge(t *testing.T) { rwTx.Rollback() } }() + ac := agg.BeginFilesRo() defer ac.Close() domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) diff --git a/erigon-lib/state/archive.go b/erigon-lib/state/archive.go index 73487dca47e..a2e76e4530c 100644 --- a/erigon-lib/state/archive.go +++ b/erigon-lib/state/archive.go @@ -47,6 +47,21 @@ func ParseFileCompression(s string) (FileCompression, error) { } } +func (c FileCompression) String() string { + switch c { + case CompressNone: + return "none" + case CompressKeys: + return "k" + case CompressVals: + return "v" + case CompressKeys | CompressVals: + return "kv" + default: + return "" + } +} + type getter struct { *seg.Getter nextValue bool // if nextValue true then getter.Next() expected to return value diff --git a/erigon-lib/state/bps_tree.go b/erigon-lib/state/bps_tree.go index 0f662bef682..bb29154afab 100644 --- a/erigon-lib/state/bps_tree.go +++ b/erigon-lib/state/bps_tree.go @@ -21,7 +21,6 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/erigontech/erigon-lib/common/dbg" "io" "time" "unsafe" @@ -29,6 +28,7 @@ import ( "github.com/c2h5oh/datasize" "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/common/dbg" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/recsplit/eliasfano32" ) @@ -49,7 +49,7 @@ type indexSeekerIterator interface { } type dataLookupFunc func(di uint64, g ArchiveGetter) ([]byte, []byte, error) -type keyCmpFunc func(k []byte, di uint64, g ArchiveGetter) (int, []byte, error) +type keyCmpFunc func(k []byte, di uint64, g ArchiveGetter, copyBuf []byte) (int, []byte, error) // M limits amount of child for tree node. func NewBpsTree(kv ArchiveGetter, offt *eliasfano32.EliasFano, M uint64, dataLookup dataLookupFunc, keyCmp keyCmpFunc) *BpsTree { @@ -66,12 +66,11 @@ var envAssertBTKeys = dbg.EnvBool("BT_ASSERT_OFFSETS", false) func NewBpsTreeWithNodes(kv ArchiveGetter, offt *eliasfano32.EliasFano, M uint64, dataLookup dataLookupFunc, keyCmp keyCmpFunc, nodes []Node) *BpsTree { bt := &BpsTree{M: M, offt: offt, dataLookupFunc: dataLookup, keyCmpFunc: keyCmp, mx: nodes} - t := time.Now() nsz := uint64(unsafe.Sizeof(Node{})) var cachedBytes uint64 for i := 0; i < len(nodes); i++ { if envAssertBTKeys { - eq, r, err := keyCmp(nodes[i].key, nodes[i].di, kv) + eq, r, err := keyCmp(nodes[i].key, nodes[i].di, kv, nil) if err != nil { panic(err) } @@ -83,11 +82,6 @@ func NewBpsTreeWithNodes(kv ArchiveGetter, offt *eliasfano32.EliasFano, M uint64 nodes[i].off = offt.Get(nodes[i].di) } - N := offt.Count() - log.Root().Debug("BtIndex opened", "file", kv.FileName(), "M", bt.M, "N", common.PrettyCounter(N), - "cached", fmt.Sprintf("%s %.2f%%", common.PrettyCounter(len(bt.mx)), 100*(float64(len(bt.mx))/float64(N))), - "cacheSize", datasize.ByteSize(cachedBytes).HR(), "fileSize", datasize.ByteSize(kv.Size()).HR(), - "took", time.Since(t)) return bt } @@ -219,7 +213,7 @@ func (n *Node) Decode(buf []byte) (uint64, error) { return uint64(10 + l), nil } -func (b *BpsTree) WarmUp(kv ArchiveGetter) error { +func (b *BpsTree) WarmUp(kv ArchiveGetter) (err error) { t := time.Now() N := b.offt.Count() if N == 0 { @@ -238,9 +232,10 @@ func (b *BpsTree) WarmUp(kv ArchiveGetter) error { // extremely stupid picking of needed nodes: cachedBytes := uint64(0) nsz := uint64(unsafe.Sizeof(Node{})) + var key []byte for i := step; i < N; i += step { di := i - 1 - _, key, err := b.keyCmpFunc(nil, di, kv) + _, key, err = b.keyCmpFunc(nil, di, kv, key[:0]) if err != nil { return err } @@ -308,7 +303,7 @@ func (b *BpsTree) Seek(g ArchiveGetter, seekKey []byte) (key, value []byte, di u var cmp int for l < r { if r-l <= DefaultBtreeStartSkip { // found small range, faster to scan now - cmp, key, err = b.keyCmpFunc(seekKey, l, g) + cmp, key, err = b.keyCmpFunc(seekKey, l, g, key[:0]) if err != nil { return nil, nil, 0, false, err } @@ -330,7 +325,7 @@ func (b *BpsTree) Seek(g ArchiveGetter, seekKey []byte) (key, value []byte, di u } m = (l + r) >> 1 - cmp, key, err = b.keyCmpFunc(seekKey, m, g) + cmp, key, err = b.keyCmpFunc(seekKey, m, g, key[:0]) if err != nil { return nil, nil, 0, false, err } @@ -362,7 +357,7 @@ func (b *BpsTree) Seek(g ArchiveGetter, seekKey []byte) (key, value []byte, di u // returns first key which is >= key. // If key is nil, returns first key // if key is greater than all keys, returns nil -func (b *BpsTree) Get(g ArchiveGetter, key []byte) ([]byte, bool, uint64, error) { +func (b *BpsTree) Get(g ArchiveGetter, key []byte) (k []byte, ok bool, i uint64, err error) { if b.trace { fmt.Printf("get %x\n", key) } @@ -379,10 +374,11 @@ func (b *BpsTree) Get(g ArchiveGetter, key []byte) ([]byte, bool, uint64, error) defer func() { fmt.Printf("found %x [%d %d]\n", key, l, r) }() } + var cmp int var m uint64 for l < r { m = (l + r) >> 1 - cmp, k, err := b.keyCmpFunc(key, m, g) + cmp, k, err = b.keyCmpFunc(key, m, g, k[:0]) if err != nil { return nil, false, 0, err } @@ -400,7 +396,7 @@ func (b *BpsTree) Get(g ArchiveGetter, key []byte) ([]byte, bool, uint64, error) } } - cmp, k, err := b.keyCmpFunc(key, l, g) + cmp, k, err = b.keyCmpFunc(key, l, g, k[:0]) if err != nil || cmp != 0 { return nil, false, 0, err } diff --git a/erigon-lib/state/btree_index.go b/erigon-lib/state/btree_index.go index e8f3c7c0119..5d7b639a2d3 100644 --- a/erigon-lib/state/btree_index.go +++ b/erigon-lib/state/btree_index.go @@ -378,7 +378,7 @@ func (a *btAlloc) bsKey(x []byte, l, r uint64, g ArchiveGetter) (k []byte, di ui for l <= r { di = (l + r) >> 1 - cmp, k, err = a.keyCmp(x, di, g) + cmp, k, err = a.keyCmp(x, di, g, k[:0]) a.naccess++ switch { @@ -918,7 +918,7 @@ func (b *BtIndex) dataLookup(di uint64, g ArchiveGetter) ([]byte, []byte, error) } // comparing `k` with item of index `di`. using buffer `kBuf` to avoid allocations -func (b *BtIndex) keyCmp(k []byte, di uint64, g ArchiveGetter) (int, []byte, error) { +func (b *BtIndex) keyCmp(k []byte, di uint64, g ArchiveGetter, resBuf []byte) (int, []byte, error) { if di >= b.ef.Count() { return 0, nil, fmt.Errorf("%w: keyCount=%d, but key %d requested. file: %s", ErrBtIndexLookupBounds, b.ef.Count(), di+1, b.FileName()) } @@ -929,11 +929,10 @@ func (b *BtIndex) keyCmp(k []byte, di uint64, g ArchiveGetter) (int, []byte, err return 0, nil, fmt.Errorf("key at %d/%d not found, file: %s", di, b.ef.Count(), b.FileName()) } - var res []byte - res, _ = g.Next(res[:0]) + resBuf, _ = g.Next(resBuf) //TODO: use `b.getter.Match` after https://github.com/erigontech/erigon/issues/7855 - return bytes.Compare(res, k), res, nil + return bytes.Compare(resBuf, k), resBuf, nil //return b.getter.Match(k), result, nil } diff --git a/erigon-lib/state/btree_index_test.go b/erigon-lib/state/btree_index_test.go index 6025ccf20cf..5cb938efd60 100644 --- a/erigon-lib/state/btree_index_test.go +++ b/erigon-lib/state/btree_index_test.go @@ -357,7 +357,7 @@ func (b *mockIndexReader) dataLookup(di uint64, g ArchiveGetter) ([]byte, []byte } // comparing `k` with item of index `di`. using buffer `kBuf` to avoid allocations -func (b *mockIndexReader) keyCmp(k []byte, di uint64, g ArchiveGetter) (int, []byte, error) { +func (b *mockIndexReader) keyCmp(k []byte, di uint64, g ArchiveGetter, resBuf []byte) (int, []byte, error) { if di >= b.ef.Count() { return 0, nil, fmt.Errorf("%w: keyCount=%d, but key %d requested. file: %s", ErrBtIndexLookupBounds, b.ef.Count(), di+1, g.FileName()) } @@ -368,10 +368,9 @@ func (b *mockIndexReader) keyCmp(k []byte, di uint64, g ArchiveGetter) (int, []b return 0, nil, fmt.Errorf("key at %d/%d not found, file: %s", di, b.ef.Count(), g.FileName()) } - var res []byte - res, _ = g.Next(res[:0]) + resBuf, _ = g.Next(resBuf) //TODO: use `b.getter.Match` after https://github.com/erigontech/erigon/issues/7855 - return bytes.Compare(res, k), res, nil + return bytes.Compare(resBuf, k), resBuf, nil //return b.getter.Match(k), result, nil } diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index f14153dec14..51dbc4c1fea 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -28,6 +28,7 @@ import ( "regexp" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -122,18 +123,26 @@ type domainVisible struct { caches *sync.Pool } +var DomainCompressCfg = seg.Cfg{ + MinPatternScore: 1000, + DictReducerSoftLimit: 2000000, + MinPatternLen: 20, + MaxPatternLen: 32, + SamplingFactor: 4, + MaxDictPatterns: 64 * 1024 * 2, + Workers: 1, +} + func NewDomain(cfg domainCfg, aggregationStep uint64, name kv.Domain, valsTable, indexKeysTable, historyValsTable, indexTable string, integrityCheck func(name kv.Domain, fromStep, toStep uint64) bool, logger log.Logger) (*Domain, error) { if cfg.hist.iiCfg.dirs.SnapDomain == "" { panic("empty `dirs` variable") } - compressCfg := seg.DefaultCfg - compressCfg.Workers = 1 d := &Domain{ name: name, valsTable: valsTable, - compressCfg: compressCfg, + compressCfg: DomainCompressCfg, compression: cfg.compress, dirtyFiles: btree2.NewBTreeGOptions[*filesItem](filesItemLess, btree2.Options{Degree: 128, NoLocks: false}), @@ -2177,3 +2186,18 @@ func (ds *DomainStats) Accumulate(other DomainStats) { ds.DataSize += other.DataSize ds.FilesCount += other.FilesCount } + +func ParseStepsFromFileName(fileName string) (from, to uint64, err error) { + rangeString := strings.Split(fileName, ".")[1] + rangeNums := strings.Split(rangeString, "-") + // convert the range to uint64 + from, err = strconv.ParseUint(rangeNums[0], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse to %s: %w", rangeNums[1], err) + } + to, err = strconv.ParseUint(rangeNums[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse to %s: %w", rangeNums[1], err) + } + return from, to, nil +} diff --git a/erigon-lib/state/domain_committed.go b/erigon-lib/state/domain_committed.go index 9e89b062ba5..cdfe13b3491 100644 --- a/erigon-lib/state/domain_committed.go +++ b/erigon-lib/state/domain_committed.go @@ -130,9 +130,12 @@ func (dt *DomainRoTx) findShortenedKey(fullKey []byte, itemGetter ArchiveGetter, return encodeShorterKey(nil, offset), true } if dt.d.indexList&withBTree != 0 { + if item.bindex == nil { + dt.d.logger.Warn("[agg] commitment branch key replacement: file doesn't have index", "name", item.decompressor.FileName()) + } cur, err := item.bindex.Seek(itemGetter, fullKey) if err != nil { - dt.d.logger.Warn("commitment branch key replacement seek failed", + dt.d.logger.Warn("[agg] commitment branch key replacement seek failed", "key", fmt.Sprintf("%x", fullKey), "idx", "bt", "err", err, "file", item.decompressor.FileName()) } @@ -171,7 +174,7 @@ func (dt *DomainRoTx) lookupFileByItsRange(txFrom uint64, txTo uint64) *filesIte }) } - if item == nil { + if item == nil || item.bindex == nil { fileStepsss := "" for _, item := range dt.d.dirtyFiles.Items() { fileStepsss += fmt.Sprintf("%d-%d;", item.startTxNum/dt.d.aggregationStep, item.endTxNum/dt.d.aggregationStep) @@ -180,10 +183,15 @@ func (dt *DomainRoTx) lookupFileByItsRange(txFrom uint64, txTo uint64) *filesIte for _, f := range dt.files { visibleFiles += fmt.Sprintf("%d-%d;", f.startTxNum/dt.d.aggregationStep, f.endTxNum/dt.d.aggregationStep) } - dt.d.logger.Warn("lookupFileByItsRange: file not found", + dt.d.logger.Warn("[agg] lookupFileByItsRange: file not found", "stepFrom", txFrom/dt.d.aggregationStep, "stepTo", txTo/dt.d.aggregationStep, "files", fileStepsss, "_visible", visibleFiles, "visibleFilesCount", len(dt.files), "filesCount", dt.d.dirtyFiles.Len()) + + if item != nil && item.bindex == nil { + dt.d.logger.Warn("[agg] lookupFileByItsRange: file found but not indexed", "f", item.decompressor.FileName()) + } + return nil } return item @@ -238,16 +246,22 @@ func (dt *DomainRoTx) commitmentValTransformDomain(rng MergeRange, accounts, sto } dr := DomainRanges{values: rng} - accountFileMap := make(map[string]ArchiveGetter) + accountFileMap := make(map[uint64]map[uint64]ArchiveGetter) if accountList, _, _ := accounts.staticFilesInRange(dr); accountList != nil { for _, f := range accountList { - accountFileMap[fmt.Sprintf("%d-%d", f.startTxNum, f.endTxNum)] = NewArchiveGetter(f.decompressor.MakeGetter(), accounts.d.compression) + if _, ok := accountFileMap[f.startTxNum]; !ok { + accountFileMap[f.startTxNum] = make(map[uint64]ArchiveGetter) + } + accountFileMap[f.startTxNum][f.endTxNum] = NewArchiveGetter(f.decompressor.MakeGetter(), accounts.d.compression) } } - storageFileMap := make(map[string]ArchiveGetter) + storageFileMap := make(map[uint64]map[uint64]ArchiveGetter) if storageList, _, _ := storage.staticFilesInRange(dr); storageList != nil { for _, f := range storageList { - storageFileMap[fmt.Sprintf("%d-%d", f.startTxNum, f.endTxNum)] = NewArchiveGetter(f.decompressor.MakeGetter(), storage.d.compression) + if _, ok := storageFileMap[f.startTxNum]; !ok { + storageFileMap[f.startTxNum] = make(map[uint64]ArchiveGetter) + } + storageFileMap[f.startTxNum][f.endTxNum] = NewArchiveGetter(f.decompressor.MakeGetter(), storage.d.compression) } } @@ -259,24 +273,30 @@ func (dt *DomainRoTx) commitmentValTransformDomain(rng MergeRange, accounts, sto if !dt.d.replaceKeysInValues || len(valBuf) == 0 || ((keyEndTxNum-keyFromTxNum)/dt.d.aggregationStep)%2 != 0 { return valBuf, nil } - sig, ok := storageFileMap[fmt.Sprintf("%d-%d", keyFromTxNum, keyEndTxNum)] + if _, ok := storageFileMap[keyFromTxNum]; !ok { + storageFileMap[keyFromTxNum] = make(map[uint64]ArchiveGetter) + } + sig, ok := storageFileMap[keyFromTxNum][keyEndTxNum] if !ok { dirty := storage.lookupFileByItsRange(keyFromTxNum, keyEndTxNum) if dirty == nil { return nil, fmt.Errorf("dirty storage file not found %d-%d", keyFromTxNum/dt.d.aggregationStep, keyEndTxNum/dt.d.aggregationStep) } sig = NewArchiveGetter(dirty.decompressor.MakeGetter(), storage.d.compression) - storageFileMap[fmt.Sprintf("%d-%d", keyFromTxNum, keyEndTxNum)] = sig + storageFileMap[keyFromTxNum][keyEndTxNum] = sig } - aig, ok := accountFileMap[fmt.Sprintf("%d-%d", keyFromTxNum, keyEndTxNum)] + if _, ok := accountFileMap[keyFromTxNum]; !ok { + accountFileMap[keyFromTxNum] = make(map[uint64]ArchiveGetter) + } + aig, ok := accountFileMap[keyFromTxNum][keyEndTxNum] if !ok { dirty := accounts.lookupFileByItsRange(keyFromTxNum, keyEndTxNum) if dirty == nil { return nil, fmt.Errorf("dirty account file not found %d-%d", keyFromTxNum/dt.d.aggregationStep, keyEndTxNum/dt.d.aggregationStep) } aig = NewArchiveGetter(dirty.decompressor.MakeGetter(), accounts.d.compression) - accountFileMap[fmt.Sprintf("%d-%d", keyFromTxNum, keyEndTxNum)] = aig + accountFileMap[keyFromTxNum][keyEndTxNum] = aig } replacer := func(key []byte, isStorage bool) ([]byte, error) { diff --git a/erigon-lib/state/files_item.go b/erigon-lib/state/files_item.go index 451c54d49d4..d56d871322c 100644 --- a/erigon-lib/state/files_item.go +++ b/erigon-lib/state/files_item.go @@ -298,3 +298,50 @@ func (files visibleFiles) LatestMergedRange() MergeRange { } return MergeRange{} } + +func DetectCompressType(getter *seg.Getter) (compressed FileCompression) { + keyCompressed := func() (compressed bool) { + defer func() { + if rec := recover(); rec != nil { + compressed = true + } + }() + getter.Reset(0) + for i := 0; i < 100; i++ { + if getter.HasNext() { + _, _ = getter.NextUncompressed() + } + if getter.HasNext() { + _, _ = getter.Skip() + } + } + return compressed + }() + + valCompressed := func() (compressed bool) { + defer func() { + if rec := recover(); rec != nil { + compressed = true + } + }() + getter.Reset(0) + for i := 0; i < 100; i++ { + if getter.HasNext() { + _, _ = getter.Skip() + } + if getter.HasNext() { + _, _ = getter.NextUncompressed() + } + } + return compressed + }() + getter.Reset(0) + + if keyCompressed { + compressed |= CompressKeys + } + if valCompressed { + compressed |= CompressVals + } + return compressed +} diff --git a/erigon-lib/state/merge.go b/erigon-lib/state/merge.go index a7fa723509b..d217780804e 100644 --- a/erigon-lib/state/merge.go +++ b/erigon-lib/state/merge.go @@ -425,6 +425,8 @@ func mergeEfs(preval, val, buf []byte) ([]byte, error) { type valueTransformer func(val []byte, startTxNum, endTxNum uint64) ([]byte, error) +const DomainMinStepsToCompress = 16 + func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, historyFiles []*filesItem, r DomainRanges, vt valueTransformer, ps *background.ProgressSet) (valuesIn, indexIn, historyIn *filesItem, err error) { if !r.any() { return @@ -471,7 +473,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h } compression := dt.d.compression - if toStep-fromStep < 64 { + if toStep-fromStep < DomainMinStepsToCompress { compression = CompressNone } kvWriter = NewArchiveWriter(kvFile, compression) diff --git a/eth/stagedsync/bor_heimdall_shared.go b/eth/stagedsync/bor_heimdall_shared.go index 12bb5fe976a..1ce41a93439 100644 --- a/eth/stagedsync/bor_heimdall_shared.go +++ b/eth/stagedsync/bor_heimdall_shared.go @@ -467,7 +467,9 @@ func fetchAndWriteHeimdallStateSyncEvents( if config.OverrideStateSyncRecords != nil { if val, ok := config.OverrideStateSyncRecords[strconv.FormatUint(blockNum, 10)]; ok { - overrideCount = len(eventRecords) - val + overrideCount = len(eventRecords) - val //nolint + eventRecords = eventRecords[0:val] + overrideCount = len(eventRecords) - val //nolint eventRecords = eventRecords[0:val] } } diff --git a/migrations/commitment_recompress.go b/migrations/commitment_recompress.go new file mode 100644 index 00000000000..8e9974c4aef --- /dev/null +++ b/migrations/commitment_recompress.go @@ -0,0 +1,258 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package migrations + +import ( + "bufio" + "context" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/common/dir" + "github.com/erigontech/erigon-lib/config3" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon-lib/seg" + "github.com/erigontech/erigon-lib/state" + "github.com/erigontech/erigon/eth/ethconfig/estimate" +) + +var EnableRecompressCommitmentFiles = false + +var RecompressCommitmentFiles = Migration{ + Name: "recompress_commit_files", + Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { + ctx := context.Background() + + if !EnableRecompressCommitmentFiles { + log.Info("[recompress_migration] disabled") + return db.Update(ctx, func(tx kv.RwTx) error { + return BeforeCommit(tx, nil, true) + }) + } + + logEvery := time.NewTicker(10 * time.Second) + defer logEvery.Stop() + t := time.Now() + defer func() { + log.Info("[recompress_migration] done", "took", time.Since(t)) + }() + + log.Info("[recompress_migration] start") + dirsOld := dirs + dirsOld.SnapDomain += "_old" + dir.MustExist(dirsOld.SnapDomain, dirs.SnapDomain+"_backup") + //TODO: `rclone` func doesn't work for big files. need to debug + //if err := rclone(logger, dirs.SnapDomain, dirsOld.SnapDomain); err != nil { + // return err + //} + //if err := rclone(logger, dirs.SnapDomain, dirs.SnapDomain+"_backup"); err != nil { + // return err + //} + files, err := storageFiles(dirsOld) + if err != nil { + return err + } + for _, from := range files { + _, fromFileName := filepath.Split(from) + fromStep, toStep, err := state.ParseStepsFromFileName(fromFileName) + if err != nil { + return err + } + if toStep-fromStep < state.DomainMinStepsToCompress { + continue + } + + to := filepath.Join(dirs.SnapDomain, fromFileName) + if err := recompressDomain(ctx, dirs, from, to, logger); err != nil { + return err + } + _ = os.Remove(strings.ReplaceAll(to, ".kv", ".bt")) + _ = os.Remove(strings.ReplaceAll(to, ".kv", ".kvei")) + _ = os.Remove(strings.ReplaceAll(to, ".kv", ".bt.torrent")) + _ = os.Remove(strings.ReplaceAll(to, ".kv", ".kv.torrent")) + } + + agg, err := state.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger) + if err != nil { + return err + } + defer agg.Close() + agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) + if err = agg.OpenFolder(); err != nil { + return err + } + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + ac := agg.BeginFilesRo() + defer ac.Close() + + aggOld, err := state.NewAggregator(ctx, dirsOld, config3.HistoryV3AggregationStep, db, nil, logger) + if err != nil { + panic(err) + } + defer aggOld.Close() + if err = aggOld.OpenFolder(); err != nil { + panic(err) + } + aggOld.SetCompressWorkers(estimate.CompressSnapshot.Workers()) + if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + + acOld := aggOld.BeginFilesRo() + defer acOld.Close() + + if err = acOld.SqueezeCommitmentFiles(ac); err != nil { + return err + } + acOld.Close() + ac.Close() + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + agg.Close() + aggOld.Close() + + log.Info("[recompress] removing", "dir", dirsOld.SnapDomain) + _ = os.RemoveAll(dirsOld.SnapDomain) + log.Info("[recompress] success", "please_remove", dirs.SnapDomain+"_backup") + return db.Update(ctx, func(tx kv.RwTx) error { + return BeforeCommit(tx, nil, true) + }) + }, +} + +func recompressDomain(ctx context.Context, dirs datadir.Dirs, from, to string, logger log.Logger) error { + logger.Info("[recompress] file", "f", to) + decompressor, err := seg.NewDecompressor(from) + if err != nil { + return err + } + defer decompressor.Close() + defer decompressor.EnableReadAhead().DisableReadAhead() + r := state.NewArchiveGetter(decompressor.MakeGetter(), state.DetectCompressType(decompressor.MakeGetter())) + + compressCfg := state.DomainCompressCfg + compressCfg.Workers = estimate.CompressSnapshot.Workers() + c, err := seg.NewCompressor(ctx, "recompress", to, dirs.Tmp, compressCfg, log.LvlInfo, logger) + if err != nil { + return err + } + defer c.Close() + w := state.NewArchiveWriter(c, state.CompressKeys) + var k, v []byte + var i int + for r.HasNext() { + i++ + k, _ = r.Next(k[:0]) + v, _ = r.Next(v[:0]) + if err = w.AddWord(k); err != nil { + return err + } + if err = w.AddWord(v); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } + if err := c.Compress(); err != nil { + return err + } + + return nil +} + +func storageFiles(dirs datadir.Dirs) ([]string, error) { + files, err := dir.ListFiles(dirs.SnapDomain, ".kv") + if err != nil { + return nil, err + } + res := make([]string, 0, len(files)) + for _, f := range files { + if !strings.Contains(f, kv.StorageDomain.String()) { + continue + } + res = append(res, f) + } + return res, nil +} + +// nolint +func rclone(logger log.Logger, from, to string) error { + cmd := exec.Command("rclone", "sync", "--progress", "--stats-one-line", from, to) + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return err + } + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return err + } + + // Start the command + if err := cmd.Start(); err != nil { + return err + } + + // WaitGroup to wait for both goroutines to finish + var wg sync.WaitGroup + wg.Add(2) + + // Stream stdout + go func() { + defer wg.Done() + streamToLogger(stdoutPipe, logger, "STDOUT") + }() + + // Stream stderr + go func() { + defer wg.Done() + streamToLogger(stderrPipe, logger, "STDERR") + }() + + // Wait for all streams to finish + wg.Wait() + return nil +} + +// streamToLogger reads from the provided reader and logs each line +func streamToLogger(reader io.Reader, logger log.Logger, prefix string) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + logger.Info("[recompress] rclone", "out", prefix, "text", scanner.Text()) + } + if err := scanner.Err(); err != nil { + logger.Info("[recompress] rclone", "out", prefix, "err", err) + } +} diff --git a/migrations/commitment.go b/migrations/commitment_squeeze.go similarity index 74% rename from migrations/commitment.go rename to migrations/commitment_squeeze.go index 5133abbac76..03cf272ec95 100644 --- a/migrations/commitment.go +++ b/migrations/commitment_squeeze.go @@ -20,12 +20,12 @@ import ( "context" "time" - "github.com/erigontech/erigon-lib/config3" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/config3" "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/log/v3" libstate "github.com/erigontech/erigon-lib/state" + "github.com/erigontech/erigon/eth/ethconfig/estimate" ) var EnableSqueezeCommitmentFiles = false @@ -35,28 +35,40 @@ var SqueezeCommitmentFiles = Migration{ Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { ctx := context.Background() - if !EnableSqueezeCommitmentFiles || !libstate.AggregatorSqueezeCommitmentValues { //nolint:staticcheck + if !EnableSqueezeCommitmentFiles { + log.Info("[sqeeze_migration] disabled") return db.Update(ctx, func(tx kv.RwTx) error { return BeforeCommit(tx, nil, true) }) } - logger.Info("File migration is disabled", "name", "squeeze_commit_files") logEvery := time.NewTicker(10 * time.Second) defer logEvery.Stop() + t := time.Now() + defer func() { + log.Info("[sqeeze_migration] done", "took", time.Since(t)) + }() + log.Info("[sqeeze_migration] 'squeeze' mode start") agg, err := libstate.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger) if err != nil { return err } defer agg.Close() + agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) if err = agg.OpenFolder(); err != nil { return err } - + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } ac := agg.BeginFilesRo() defer ac.Close() - if err = ac.SqueezeCommitmentFiles(); err != nil { + if err = ac.SqueezeCommitmentFiles(ac); err != nil { + return err + } + ac.Close() + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { return err } return db.Update(ctx, func(tx kv.RwTx) error { diff --git a/migrations/migrations.go b/migrations/migrations.go index bb70084933f..1cb941261a8 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -53,6 +53,7 @@ var migrations = map[kv.Label][]Migration{ dbSchemaVersion5, ProhibitNewDownloadsLock, SqueezeCommitmentFiles, + RecompressCommitmentFiles, ProhibitNewDownloadsLock2, ClearBorTables, }, diff --git a/node/node.go b/node/node.go index a6c03378760..2116c91bec5 100644 --- a/node/node.go +++ b/node/node.go @@ -30,6 +30,7 @@ import ( "time" "github.com/c2h5oh/datasize" + "github.com/erigontech/erigon-lib/common/dbg" "golang.org/x/sync/semaphore" "github.com/erigontech/erigon-lib/common/datadir" @@ -375,6 +376,7 @@ func OpenDatabase(ctx context.Context, config *nodecfg.Config, label kv.Label, n if err != nil { return nil, err } + migrator := migrations.NewMigrator(label) if err := migrator.VerifyVersion(db, dbPath); err != nil { return nil, err @@ -384,7 +386,7 @@ func OpenDatabase(ctx context.Context, config *nodecfg.Config, label kv.Label, n if err != nil { return nil, err } - if has { + if has && !dbg.OnlyCreateDB { logger.Info("Re-Opening DB in exclusive mode to apply migrations") db.Close() db, err = openFunc(true)