From 292697e80fe1990f2ddfc575bb00a49baf70f4ec Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 11 Sep 2024 14:54:53 +0700 Subject: [PATCH] `erigon seg sqeeze` command (#11933) - moved some migrations to `erigon seg squeeze` sub-command - because they are not migrations - added `erigon seg squeeze --type=blocks` - added `Compressor.ReadFrom` --- cmd/integration/commands/flags.go | 8 - cmd/integration/commands/stages.go | 4 - erigon-lib/common/datadir/dirs.go | 18 -- erigon-lib/kv/rawdbv3/txnum.go | 1 - erigon-lib/seg/compress.go | 11 + erigon-lib/seg/seg_auto_rw.go | 11 + erigon-lib/state/sqeeze.go | 27 +- eth/stagedsync/exec3.go | 4 +- migrations/migrations.go | 3 - migrations/sqeeze_code.go | 74 ------ migrations/sqeeze_storage.go | 118 --------- migrations/squeeze_commitment.go | 78 ------ turbo/app/README.md | 44 ++-- turbo/app/snapshots_cmd.go | 206 ++++++++------- turbo/app/sqeeze_cmd.go | 240 ++++++++++++++++++ turbo/jsonrpc/erigon_receipts.go | 5 +- turbo/jsonrpc/eth_receipts.go | 14 +- .../freezeblocks/block_snapshots.go | 13 +- .../snapshotsync/freezeblocks/block_sqeeze.go | 37 +++ 19 files changed, 462 insertions(+), 454 deletions(-) delete mode 100644 migrations/sqeeze_code.go delete mode 100644 migrations/sqeeze_storage.go delete mode 100644 migrations/squeeze_commitment.go create mode 100644 turbo/app/sqeeze_cmd.go create mode 100644 turbo/snapshotsync/freezeblocks/block_sqeeze.go diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index edb328fa9db..d46170c21b5 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -58,8 +58,6 @@ var ( workers, reconWorkers uint64 dbWriteMap bool - - squeezeCommitment, squeezeStorage, squeezeCode bool ) func must(err error) { @@ -131,12 +129,6 @@ func withBucket(cmd *cobra.Command) { cmd.Flags().StringVar(&bucket, "bucket", "", "reset given stage") } -func withSqueezeCommitmentFiles(cmd *cobra.Command) { - cmd.Flags().BoolVar(&squeezeCommitment, "squeeze.commitment", false, "allow to squeeze commitment files on start") - cmd.Flags().BoolVar(&squeezeStorage, "sqeeze.storage", false, "allow to recompress existing .kv files") - cmd.Flags().BoolVar(&squeezeCode, "sqeeze.code", false, "allow to recompress existing .kv files") -} - func withDataDir2(cmd *cobra.Command) { // --datadir is required, but no --chain flag: read chainConfig from db instead cmd.Flags().StringVar(&datadirCli, utils.DataDirFlag.Name, "", utils.DataDirFlag.Usage) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 51f4c688d04..74c5e246da0 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -432,9 +432,6 @@ var cmdRunMigrations = &cobra.Command{ Short: "", Run: func(cmd *cobra.Command, args []string) { logger := debug.SetupCobra(cmd, "integration") - migrations.EnableSqueezeCommitment = squeezeCommitment - migrations.EnableSqeezeStorage = squeezeStorage - migrations.EnableSqeezeCode = squeezeCode //non-accede and exclusive mode - to apply create new tables if need. cfg := dbCfg(kv.ChainDB, chaindata).Flags(func(u uint) uint { return u &^ mdbx.Accede }).Exclusive() db, err := openDB(cfg, true, logger) @@ -588,7 +585,6 @@ func init() { withConfig(cmdRunMigrations) withDataDir(cmdRunMigrations) - withSqueezeCommitmentFiles(cmdRunMigrations) withChain(cmdRunMigrations) withHeimdall(cmdRunMigrations) rootCmd.AddCommand(cmdRunMigrations) diff --git a/erigon-lib/common/datadir/dirs.go b/erigon-lib/common/datadir/dirs.go index b80e465ea8e..266625088f2 100644 --- a/erigon-lib/common/datadir/dirs.go +++ b/erigon-lib/common/datadir/dirs.go @@ -172,24 +172,6 @@ func downloaderV2Migration(dirs Dirs) error { return nil } -// nolint -func moveFiles(from, to string, ext string) error { - files, err := dir.ReadDir(from) - if err != nil { - return fmt.Errorf("ReadDir: %w, %s", err, from) - } - for _, f := range files { - if f.Type().IsDir() || !f.Type().IsRegular() { - continue - } - if filepath.Ext(f.Name()) != ext { - continue - } - _ = os.Rename(filepath.Join(from, f.Name()), filepath.Join(to, f.Name())) - } - return nil -} - func CopyFile(from, to string) error { r, err := os.Open(from) if err != nil { diff --git a/erigon-lib/kv/rawdbv3/txnum.go b/erigon-lib/kv/rawdbv3/txnum.go index 18168ca4870..d1d6a38746f 100644 --- a/erigon-lib/kv/rawdbv3/txnum.go +++ b/erigon-lib/kv/rawdbv3/txnum.go @@ -214,7 +214,6 @@ func (t TxNumsReader) FindBlockNum(tx kv.Tx, endTxNumMinimax uint64) (ok bool, b if err != nil { return true } - if !ok { _lb, _lt, _ := t.Last(tx) err = fmt.Errorf("FindBlockNum(%d): seems broken TxNum value: %x -> (%x, %x); last in db: (%d, %d)", endTxNumMinimax, seek, i, maxTxNum, _lb, _lt) diff --git a/erigon-lib/seg/compress.go b/erigon-lib/seg/compress.go index 4bbabe7b90e..17bb8ee156b 100644 --- a/erigon-lib/seg/compress.go +++ b/erigon-lib/seg/compress.go @@ -179,6 +179,17 @@ func (c *Compressor) WorkersAmount() int { return c.Workers } func (c *Compressor) Count() int { return int(c.wordsCount) } +func (c *Compressor) ReadFrom(g *Getter) error { + var v []byte + for g.HasNext() { + v, _ = g.Next(v[:0]) + if err := c.AddWord(v); err != nil { + return err + } + } + return nil +} + func (c *Compressor) AddWord(word []byte) error { select { case <-c.ctx.Done(): diff --git a/erigon-lib/seg/seg_auto_rw.go b/erigon-lib/seg/seg_auto_rw.go index 37ff5bddb21..f16dc073f3e 100644 --- a/erigon-lib/seg/seg_auto_rw.go +++ b/erigon-lib/seg/seg_auto_rw.go @@ -141,6 +141,17 @@ func (c *Writer) AddWord(word []byte) error { return c.Compressor.AddUncompressedWord(word) } +func (c *Writer) ReadFrom(r *Reader) error { + var v []byte + for r.HasNext() { + v, _ = r.Next(v[:0]) + if err := c.AddWord(v); err != nil { + return err + } + } + return nil +} + func (c *Writer) Close() { if c.Compressor != nil { c.Compressor.Close() diff --git a/erigon-lib/state/sqeeze.go b/erigon-lib/state/sqeeze.go index 4db39353a26..a3067277c63 100644 --- a/erigon-lib/state/sqeeze.go +++ b/erigon-lib/state/sqeeze.go @@ -38,7 +38,7 @@ func (a *Aggregator) Sqeeze(ctx context.Context, domain kv.Domain) error { return err } - if err := a.sqeezeFile(ctx, domain, tempFileCopy, to); err != nil { + if err := a.sqeezeDomainFile(ctx, domain, tempFileCopy, to); err != nil { return err } _ = os.Remove(tempFileCopy) @@ -51,7 +51,7 @@ func (a *Aggregator) Sqeeze(ctx context.Context, domain kv.Domain) error { return nil } -func (a *Aggregator) sqeezeFile(ctx context.Context, domain kv.Domain, from, to string) error { +func (a *Aggregator) sqeezeDomainFile(ctx context.Context, domain kv.Domain, from, to string) error { if domain == kv.CommitmentDomain { panic("please use SqueezeCommitmentFiles func") } @@ -59,7 +59,7 @@ func (a *Aggregator) sqeezeFile(ctx context.Context, domain kv.Domain, from, to compression := a.d[domain].compression compressCfg := a.d[domain].compressCfg - a.logger.Info("[recompress] file", "f", to, "cfg", compressCfg, "c", compression) + a.logger.Info("[sqeeze] file", "f", to, "cfg", compressCfg, "c", compression) decompressor, err := seg.NewDecompressor(from) if err != nil { return err @@ -68,29 +68,14 @@ func (a *Aggregator) sqeezeFile(ctx context.Context, domain kv.Domain, from, to defer decompressor.EnableReadAhead().DisableReadAhead() r := seg.NewReader(decompressor.MakeGetter(), seg.DetectCompressType(decompressor.MakeGetter())) - c, err := seg.NewCompressor(ctx, "recompress", to, a.dirs.Tmp, compressCfg, log.LvlInfo, a.logger) + c, err := seg.NewCompressor(ctx, "sqeeze", to, a.dirs.Tmp, compressCfg, log.LvlInfo, a.logger) if err != nil { return err } defer c.Close() w := seg.NewWriter(c, compression) - var k, v []byte - var i int - for r.HasNext() { - i++ - k, _ = r.Next(k[:0]) - v, _ = r.Next(v[:0]) - if err = w.AddWord(k); err != nil { - return err - } - if err = w.AddWord(v); err != nil { - return err - } - select { - case <-ctx.Done(): - return ctx.Err() - default: - } + if err := w.ReadFrom(r); err != nil { + return err } if err := c.Compress(); err != nil { return err diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 4022149e481..79b81f3c26d 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -281,7 +281,9 @@ func ExecV3(ctx context.Context, return err } if !ok { - return fmt.Errorf("seems broken TxNums index not filled. can't find blockNum of txNum=%d", inputTxNum) + _lb, _lt, _ := txNumsReader.Last(applyTx) + _fb, _ft, _ := txNumsReader.First(applyTx) + return fmt.Errorf("seems broken TxNums index not filled. can't find blockNum of txNum=%d; in db: (%d-%d, %d-%d)", inputTxNum, _fb, _lb, _ft, _lt) } { _max, _ := txNumsReader.Max(applyTx, _blockNum) diff --git a/migrations/migrations.go b/migrations/migrations.go index afe4cd0b275..211d77f860e 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -53,9 +53,6 @@ var migrations = map[kv.Label][]Migration{ kv.ChainDB: { dbSchemaVersion5, ProhibitNewDownloadsLock, - SqueezeCommitmentFiles, - RecompressCommitmentFiles, - RecompressCodeFiles, ProhibitNewDownloadsLock2, ClearBorTables, }, diff --git a/migrations/sqeeze_code.go b/migrations/sqeeze_code.go deleted file mode 100644 index 56a65a4e22f..00000000000 --- a/migrations/sqeeze_code.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package migrations - -import ( - "context" - "time" - - "github.com/erigontech/erigon-lib/common/datadir" - "github.com/erigontech/erigon-lib/config3" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon-lib/state" - "github.com/erigontech/erigon/eth/ethconfig/estimate" -) - -var EnableSqeezeCode = false - -var RecompressCodeFiles = Migration{ - Name: "code_recompress", - Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { - ctx := context.Background() - - if !EnableSqeezeCode { - log.Info("[recompress_code_migration] disabled") - return db.Update(ctx, func(tx kv.RwTx) error { - return BeforeCommit(tx, nil, true) - }) - } - - logEvery := time.NewTicker(10 * time.Second) - defer logEvery.Stop() - t := time.Now() - defer func() { - log.Info("[sqeeze_migration] done", "took", time.Since(t)) - }() - - agg, err := state.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger) - if err != nil { - return err - } - defer agg.Close() - agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) - - log.Info("[sqeeze_migration] start") - if err := agg.Sqeeze(ctx, kv.CodeDomain); err != nil { - return err - } - if err = agg.OpenFolder(); err != nil { - return err - } - if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { - return err - } - - return db.Update(ctx, func(tx kv.RwTx) error { - return BeforeCommit(tx, nil, true) - }) - }, -} diff --git a/migrations/sqeeze_storage.go b/migrations/sqeeze_storage.go deleted file mode 100644 index 04da2b4fccb..00000000000 --- a/migrations/sqeeze_storage.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package migrations - -import ( - "context" - "os" - "time" - - "github.com/erigontech/erigon-lib/common/datadir" - "github.com/erigontech/erigon-lib/common/dir" - "github.com/erigontech/erigon-lib/config3" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon-lib/state" - "github.com/erigontech/erigon/eth/ethconfig/estimate" -) - -var EnableSqeezeStorage = false - -var RecompressCommitmentFiles = Migration{ - Name: "recompress_commit_files", - Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { - ctx := context.Background() - - if !EnableSqeezeStorage { - log.Info("[recompress_migration] disabled") - return db.Update(ctx, func(tx kv.RwTx) error { - return BeforeCommit(tx, nil, true) - }) - } - - logEvery := time.NewTicker(10 * time.Second) - defer logEvery.Stop() - t := time.Now() - defer func() { - log.Info("[recompress_migration] done", "took", time.Since(t)) - }() - - agg, err := state.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger) - if err != nil { - return err - } - defer agg.Close() - agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) - - log.Info("[recompress_migration] start") - dirsOld := dirs - dirsOld.SnapDomain += "_old" - dir.MustExist(dirsOld.SnapDomain, dirs.SnapDomain+"_backup") - if err := agg.Sqeeze(ctx, kv.StorageDomain); err != nil { - return err - } - - if err = agg.OpenFolder(); err != nil { - return err - } - if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { - return err - } - ac := agg.BeginFilesRo() - defer ac.Close() - - aggOld, err := state.NewAggregator(ctx, dirsOld, config3.HistoryV3AggregationStep, db, nil, logger) - if err != nil { - panic(err) - } - defer aggOld.Close() - if err = aggOld.OpenFolder(); err != nil { - panic(err) - } - aggOld.SetCompressWorkers(estimate.CompressSnapshot.Workers()) - if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { - return err - } - if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { - return err - } - - acOld := aggOld.BeginFilesRo() - defer acOld.Close() - - if err = acOld.SqueezeCommitmentFiles(ac); err != nil { - return err - } - acOld.Close() - ac.Close() - if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { - return err - } - if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { - return err - } - agg.Close() - aggOld.Close() - - log.Info("[recompress] removing", "dir", dirsOld.SnapDomain) - _ = os.RemoveAll(dirsOld.SnapDomain) - log.Info("[recompress] success", "please_remove", dirs.SnapDomain+"_backup") - return db.Update(ctx, func(tx kv.RwTx) error { - return BeforeCommit(tx, nil, true) - }) - }, -} diff --git a/migrations/squeeze_commitment.go b/migrations/squeeze_commitment.go deleted file mode 100644 index 59bb0c1f2f0..00000000000 --- a/migrations/squeeze_commitment.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package migrations - -import ( - "context" - "time" - - "github.com/erigontech/erigon-lib/common/datadir" - "github.com/erigontech/erigon-lib/config3" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/log/v3" - libstate "github.com/erigontech/erigon-lib/state" - "github.com/erigontech/erigon/eth/ethconfig/estimate" -) - -var EnableSqueezeCommitment = false - -var SqueezeCommitmentFiles = Migration{ - Name: "squeeze_commit_files", - Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { - ctx := context.Background() - - if !EnableSqueezeCommitment { - log.Info("[sqeeze_migration] disabled") - return db.Update(ctx, func(tx kv.RwTx) error { - return BeforeCommit(tx, nil, true) - }) - } - - logEvery := time.NewTicker(10 * time.Second) - defer logEvery.Stop() - t := time.Now() - defer func() { - log.Info("[sqeeze_migration] done", "took", time.Since(t)) - }() - - log.Info("[sqeeze_migration] 'squeeze' mode start") - agg, err := libstate.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger) - if err != nil { - return err - } - defer agg.Close() - agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) - if err = agg.OpenFolder(); err != nil { - return err - } - if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { - return err - } - ac := agg.BeginFilesRo() - defer ac.Close() - if err = ac.SqueezeCommitmentFiles(ac); err != nil { - return err - } - ac.Close() - if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { - return err - } - return db.Update(ctx, func(tx kv.RwTx) error { - return BeforeCommit(tx, nil, true) - }) - }, -} diff --git a/turbo/app/README.md b/turbo/app/README.md index 01acd430bdf..ab0e775eb15 100644 --- a/turbo/app/README.md +++ b/turbo/app/README.md @@ -8,32 +8,36 @@ ## Support -This command connects erigon to diagnostics tools by establishing websocket connection. +This command connects erigon to diagnostics tools by establishing websocket connection. + +In order to conect diagnostics run -In order to conect diagnostics run ``` ./build/bin/erigon support --debug.addrs --diagnostics.addr --diagnostics.sessions ``` -| | | -|---|---| -|diagnostics.addr|Address of the diagnostics system provided by the support team, include unique session PIN. [Instructions how to get proper adderess](https://github.com/erigontech/diagnostics?tab=readme-ov-file#step-4)| -|debug.addrs|Comma separated list of URLs to the debug endpoints thats are being diagnosed. This endpoints must mutch values of `diagnostics.endpoint.addr:diagnostics.endpoint.port` by default its `localhost:6060`| -|diagnostics.sessions|Comma separated list of session PINs to connect to [Instructions how to obtain PIN](https://github.com/erigontech/diagnostics?tab=readme-ov-file#step-2)| -||| - +| | | +|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| diagnostics.addr | Address of the diagnostics system provided by the support team, include unique session PIN. [Instructions how to get proper adderess](https://github.com/erigontech/diagnostics?tab=readme-ov-file#step-4) | +| debug.addrs | Comma separated list of URLs to the debug endpoints thats are being diagnosed. This endpoints must mutch values of `diagnostics.endpoint.addr:diagnostics.endpoint.port` by default its `localhost:6060` | +| diagnostics.sessions | Comma separated list of session PINs to connect to [Instructions how to obtain PIN](https://github.com/erigontech/diagnostics?tab=readme-ov-file#step-2) | +| | | ## Snapshots This sub command can be used for manipulating snapshot files +## Danger zone: `seg sqeeze` + +To perform foreign-key-awared re-compression of files + ### Uploader The `snapshots uploader` command starts a version of erigon customized for uploading snapshot files to -a remote location. +a remote location. It breaks the stage execution process after the senders stage and then uses the snapshot stage to send -uploaded headers, bodies and (in the case of polygon) bor spans and events to snapshot files. Because +uploaded headers, bodies and (in the case of polygon) bor spans and events to snapshot files. Because this process avoids execution in run signifigantly faster than a standard erigon configuration. The uploader uses rclone to send seedable (100K or 500K blocks) to a remote storage location specified @@ -51,14 +55,13 @@ in addition to this it has the following performance related features: The following configuration can be used to upload blocks from genesis where: -| | | -|---|---| -| sync.loop.prune.limit=500000 | Sets the records to be pruned to the database to 500,000 per iteration (as opposed to 100) | -| upload.location=r2:erigon-v2-snapshots-bor-mainnet | Specified the rclone loaction to upload snapshot to | -| upload.from=earliest | Sets the upload start location to be the earliest available block, which will be 0 in the case of a fresh installation, or specified by the last block in the chaindata db | -| upload.snapshot.limit=1500000 | Tells the uploader to keep a maximum 1,500,000 blocks in the `snapshots` before deleting the aged snapshot | -| snapshot.version=2 | Indivates the version to be appended to snapshot file names when they are creatated| - +| | | +|----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| sync.loop.prune.limit=500000 | Sets the records to be pruned to the database to 500,000 per iteration (as opposed to 100) | +| upload.location=r2:erigon-v2-snapshots-bor-mainnet | Specified the rclone loaction to upload snapshot to | +| upload.from=earliest | Sets the upload start location to be the earliest available block, which will be 0 in the case of a fresh installation, or specified by the last block in the chaindata db | +| upload.snapshot.limit=1500000 | Tells the uploader to keep a maximum 1,500,000 blocks in the `snapshots` before deleting the aged snapshot | +| snapshot.version=2 | Indivates the version to be appended to snapshot file names when they are creatated | ```shell erigon/build/bin/erigon seg uploader --datadir=~/snapshots/bor-mainnet --chain=bor-mainnet \ @@ -67,7 +70,8 @@ erigon/build/bin/erigon seg uploader --datadir=~/snapshots/bor-mainnet --chain=b --upload.snapshot.limit=1500000 ``` -In order to start with the lates uploaded block when starting with an empty drive set the `upload.from` flag to `latest`. e.g. +In order to start with the lates uploaded block when starting with an empty drive set the `upload.from` flag to +`latest`. e.g. ```shell --upload.from=latest diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index ac1a7fee935..d7b260aa3e9 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -212,104 +212,9 @@ var snapshotCommand = cli.Command{ Flags: joinFlags([]cli.Flag{&utils.DataDirFlag}), }, { - Name: "rm-state-snapshots", - Action: func(cliCtx *cli.Context) error { - dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) - - removeLatest := cliCtx.Bool("latest") - steprm := cliCtx.String("step") - if steprm == "" && !removeLatest { - return errors.New("step to remove is required (eg 0-2) OR flag --latest provided") - } - if steprm != "" { - removeLatest = false // --step has higher priority - } - - _maxFrom := uint64(0) - files := make([]snaptype.FileInfo, 0) - for _, dirPath := range []string{dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors} { - filePaths, err := dir.ListFiles(dirPath) - if err != nil { - return err - } - for _, filePath := range filePaths { - _, fName := filepath.Split(filePath) - res, isStateFile, ok := snaptype.ParseFileName(dirPath, fName) - if !ok || !isStateFile { - fmt.Printf("skipping %s\n", filePath) - continue - } - if res.From == 0 && res.To == 0 { - parts := strings.Split(fName, ".") - if len(parts) == 3 || len(parts) == 4 { - fsteps := strings.Split(parts[1], "-") - res.From, err = strconv.ParseUint(fsteps[0], 10, 64) - if err != nil { - return err - } - res.To, err = strconv.ParseUint(fsteps[1], 10, 64) - if err != nil { - return err - } - } - } - - files = append(files, res) - if removeLatest { - _maxFrom = max(_maxFrom, res.From) - } - } - } - - var minS, maxS uint64 - if removeLatest { - AllowPruneSteps: - fmt.Printf("remove latest snapshot files with stepFrom=%d?\n1) Remove\n2) Exit\n (pick number): ", _maxFrom) - var ans uint8 - _, err := fmt.Scanf("%d\n", &ans) - if err != nil { - return err - } - switch ans { - case 1: - minS, maxS = _maxFrom, math.MaxUint64 - break - case 2: - return nil - default: - fmt.Printf("invalid input: %d; Just an answer number expected.\n", ans) - goto AllowPruneSteps - } - } else if steprm != "" { - parseStep := func(step string) (uint64, uint64, error) { - var from, to uint64 - if _, err := fmt.Sscanf(step, "%d-%d", &from, &to); err != nil { - return 0, 0, fmt.Errorf("step expected in format from-to, got %s", step) - } - return from, to, nil - } - var err error - minS, maxS, err = parseStep(steprm) - if err != nil { - return err - } - } else { - panic("unexpected arguments") - } - - var removed int - for _, res := range files { - if res.From >= minS && res.To <= maxS { - if err := os.Remove(res.Path); err != nil { - return fmt.Errorf("failed to remove %s: %w", res.Path, err) - } - removed++ - } - } - fmt.Printf("removed %d state snapshot files\n", removed) - return nil - }, - Flags: joinFlags([]cli.Flag{&utils.DataDirFlag, &cli.StringFlag{Name: "step", Required: false}, &cli.BoolFlag{Name: "latest", Required: false}}), + Name: "rm-state-snapshots", + Action: doRmStateSnapshots, + Flags: joinFlags([]cli.Flag{&utils.DataDirFlag, &cli.StringFlag{Name: "step", Required: false}, &cli.BoolFlag{Name: "latest", Required: false}}), }, { Name: "diff", @@ -333,6 +238,14 @@ var snapshotCommand = cli.Command{ &cli.StringFlag{Name: "domain", Required: true}, }), }, + { + Name: "sqeeze", + Action: doSqueeze, + Flags: joinFlags([]cli.Flag{ + &utils.DataDirFlag, + &cli.StringFlag{Name: "type", Required: true}, + }), + }, { Name: "integrity", Action: doIntegrity, @@ -385,6 +298,103 @@ var ( } ) +func doRmStateSnapshots(cliCtx *cli.Context) error { + dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) + + removeLatest := cliCtx.Bool("latest") + steprm := cliCtx.String("step") + if steprm == "" && !removeLatest { + return errors.New("step to remove is required (eg 0-2) OR flag --latest provided") + } + if steprm != "" { + removeLatest = false // --step has higher priority + } + + _maxFrom := uint64(0) + files := make([]snaptype.FileInfo, 0) + for _, dirPath := range []string{dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors} { + filePaths, err := dir.ListFiles(dirPath) + if err != nil { + return err + } + for _, filePath := range filePaths { + _, fName := filepath.Split(filePath) + res, isStateFile, ok := snaptype.ParseFileName(dirPath, fName) + if !ok || !isStateFile { + fmt.Printf("skipping %s\n", filePath) + continue + } + if res.From == 0 && res.To == 0 { + parts := strings.Split(fName, ".") + if len(parts) == 3 || len(parts) == 4 { + fsteps := strings.Split(parts[1], "-") + res.From, err = strconv.ParseUint(fsteps[0], 10, 64) + if err != nil { + return err + } + res.To, err = strconv.ParseUint(fsteps[1], 10, 64) + if err != nil { + return err + } + } + } + + files = append(files, res) + if removeLatest { + _maxFrom = max(_maxFrom, res.From) + } + } + } + + var minS, maxS uint64 + if removeLatest { + AllowPruneSteps: + fmt.Printf("remove latest snapshot files with stepFrom=%d?\n1) Remove\n2) Exit\n (pick number): ", _maxFrom) + var ans uint8 + _, err := fmt.Scanf("%d\n", &ans) + if err != nil { + return err + } + switch ans { + case 1: + minS, maxS = _maxFrom, math.MaxUint64 + break + case 2: + return nil + default: + fmt.Printf("invalid input: %d; Just an answer number expected.\n", ans) + goto AllowPruneSteps + } + } else if steprm != "" { + parseStep := func(step string) (uint64, uint64, error) { + var from, to uint64 + if _, err := fmt.Sscanf(step, "%d-%d", &from, &to); err != nil { + return 0, 0, fmt.Errorf("step expected in format from-to, got %s", step) + } + return from, to, nil + } + var err error + minS, maxS, err = parseStep(steprm) + if err != nil { + return err + } + } else { + panic("unexpected arguments") + } + + var removed int + for _, res := range files { + if res.From >= minS && res.To <= maxS { + if err := os.Remove(res.Path); err != nil { + return fmt.Errorf("failed to remove %s: %w", res.Path, err) + } + removed++ + } + } + fmt.Printf("removed %d state snapshot files\n", removed) + return nil +} + func doBtSearch(cliCtx *cli.Context) error { logger, _, _, err := debug.Setup(cliCtx, true /* root logger */) if err != nil { diff --git a/turbo/app/sqeeze_cmd.go b/turbo/app/sqeeze_cmd.go new file mode 100644 index 00000000000..6c8c8287fcd --- /dev/null +++ b/turbo/app/sqeeze_cmd.go @@ -0,0 +1,240 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package app + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/erigontech/erigon-lib/common/dir" + "github.com/erigontech/erigon-lib/config3" + "github.com/erigontech/erigon-lib/downloader/snaptype" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/rawdbv3" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon-lib/state" + "github.com/erigontech/erigon/cmd/hack/tool/fromdb" + "github.com/erigontech/erigon/core/rawdb" + snaptype2 "github.com/erigontech/erigon/core/snaptype" + "github.com/erigontech/erigon/eth/ethconfig" + "github.com/erigontech/erigon/eth/ethconfig/estimate" + "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" + "github.com/urfave/cli/v2" + + "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon/cmd/utils" + "github.com/erigontech/erigon/turbo/debug" +) + +type Sqeeze string + +var ( + SqeezeCommitment Sqeeze = "commitment" + SqeezeStorage Sqeeze = "storage" + SqeezeCode Sqeeze = "code" + SqeezeBlocks Sqeeze = "blocks" +) + +func doSqueeze(cliCtx *cli.Context) error { + dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) + logger, _, _, err := debug.Setup(cliCtx, true /* rootLogger */) + if err != nil { + return err + } + ctx := cliCtx.Context + logEvery := time.NewTicker(10 * time.Second) + defer logEvery.Stop() + + t := Sqeeze(cliCtx.String("type")) + + start := time.Now() + log.Info("[sqeeze] start", "t", t) + defer func() { logger.Info("[sqeeze] done", "t", t, "took", time.Since(start)) }() + + switch { + case t == SqeezeCommitment: + return squeezeCommitment(ctx, dirs, logger) + case t == SqeezeStorage: + return squeezeStorage(ctx, dirs, logger) + case t == SqeezeCode: + return squeezeCode(ctx, dirs, logger) + case t == SqeezeBlocks: + return squeezeBlocks(ctx, dirs, logger) + default: + + return fmt.Errorf("unknown type: %s", t) + } +} + +func squeezeCommitment(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error { + db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() + defer db.Close() + cr := rawdb.NewCanonicalReader(rawdbv3.TxNums) + agg := openAgg(ctx, dirs, db, cr, logger) + agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) + if err := agg.OpenFolder(); err != nil { + return err + } + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + ac := agg.BeginFilesRo() + defer ac.Close() + if err := ac.SqueezeCommitmentFiles(ac); err != nil { + return err + } + ac.Close() + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + return nil +} + +func squeezeStorage(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error { + db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() + defer db.Close() + cr := rawdb.NewCanonicalReader(rawdbv3.TxNums) + agg := openAgg(ctx, dirs, db, cr, logger) + agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) + dirsOld := dirs + dirsOld.SnapDomain += "_old" + dir.MustExist(dirsOld.SnapDomain, dirs.SnapDomain+"_backup") + if err := agg.Sqeeze(ctx, kv.StorageDomain); err != nil { + return err + } + + if err := agg.OpenFolder(); err != nil { + return err + } + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + ac := agg.BeginFilesRo() + defer ac.Close() + + aggOld, err := state.NewAggregator(ctx, dirsOld, config3.HistoryV3AggregationStep, db, nil, logger) + if err != nil { + panic(err) + } + defer aggOld.Close() + if err = aggOld.OpenFolder(); err != nil { + panic(err) + } + aggOld.SetCompressWorkers(estimate.CompressSnapshot.Workers()) + if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + + acOld := aggOld.BeginFilesRo() + defer acOld.Close() + + if err = acOld.SqueezeCommitmentFiles(ac); err != nil { + return err + } + acOld.Close() + ac.Close() + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + agg.Close() + aggOld.Close() + + log.Info("[sqeeze] removing", "dir", dirsOld.SnapDomain) + _ = os.RemoveAll(dirsOld.SnapDomain) + log.Info("[sqeeze] success", "please_remove", dirs.SnapDomain+"_backup") + return nil +} +func squeezeCode(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error { + db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() + defer db.Close() + agg, err := state.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger) + if err != nil { + return err + } + defer agg.Close() + agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) + + log.Info("[sqeeze] start") + if err := agg.Sqeeze(ctx, kv.CodeDomain); err != nil { + return err + } + if err = agg.OpenFolder(); err != nil { + return err + } + if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil { + return err + } + return nil +} + +func squeezeBlocks(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error { + for _, f := range ls(dirs.Snap, ".seg") { + good := strings.Contains(f, snaptype2.Transactions.Name()) || + strings.Contains(f, snaptype2.Headers.Name()) + if !good { + continue + } + _, name := filepath.Split(f) + in, _, ok := snaptype.ParseFileName(dirs.Snap, name) + if !ok { + continue + } + good = in.To-in.From == snaptype.Erigon2OldMergeLimit || in.To-in.From == snaptype.Erigon2MergeLimit + if !good { + continue + } + if err := freezeblocks.Sqeeze(ctx, dirs, f, f, logger); err != nil { + return err + } + _ = os.Remove(strings.ReplaceAll(f, ".seg", ".seg.torrent")) + _ = os.Remove(strings.ReplaceAll(f, ".seg", ".idx")) + _ = os.Remove(strings.ReplaceAll(f, ".seg", ".idx.torrent")) + } + + db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() + defer db.Close() + cfg := ethconfig.NewSnapCfg(false, true, true) + _, _, _, br, _, clean, err := openSnaps(ctx, cfg, dirs, 0, db, logger) + if err != nil { + return err + } + defer clean() + + chainConfig := fromdb.ChainConfig(db) + if err := br.BuildMissedIndicesIfNeed(ctx, "retire", nil, chainConfig); err != nil { + return err + } + return nil +} + +func ls(dirPath string, ext string) []string { + res, err := dir.ListFiles(dirPath, ext) + if err != nil { + panic(err) + } + return res +} diff --git a/turbo/jsonrpc/erigon_receipts.go b/turbo/jsonrpc/erigon_receipts.go index d68c565d1ff..f1a14e270b1 100644 --- a/turbo/jsonrpc/erigon_receipts.go +++ b/turbo/jsonrpc/erigon_receipts.go @@ -194,7 +194,8 @@ func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCri exec := exec3.NewTraceWorker(tx, chainConfig, api.engine(), api._blockReader, nil) defer exec.Close() - txNumbers, err := applyFiltersV3(ctx, api._blockReader, tx, begin, end, crit) + txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, api._blockReader)) + txNumbers, err := applyFiltersV3(txNumsReader, tx, begin, end, crit) if err != nil { return erigonLogs, err } @@ -212,7 +213,7 @@ func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCri // latest logs that match the filter crit it := rawdbv3.TxNums2BlockNums(tx, - rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, api._blockReader)), + txNumsReader, txNumbers, order.Asc) defer it.Close() diff --git a/turbo/jsonrpc/eth_receipts.go b/turbo/jsonrpc/eth_receipts.go index 4eee95b788f..a247dc17188 100644 --- a/turbo/jsonrpc/eth_receipts.go +++ b/turbo/jsonrpc/eth_receipts.go @@ -39,7 +39,6 @@ import ( bortypes "github.com/erigontech/erigon/polygon/bor/types" "github.com/erigontech/erigon/rpc" "github.com/erigontech/erigon/turbo/rpchelper" - "github.com/erigontech/erigon/turbo/services" "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" ) @@ -225,10 +224,8 @@ func applyFilters(out *roaring.Bitmap, tx kv.Tx, begin, end uint64, crit filters return nil } -func applyFiltersV3(ctx context.Context, br services.FullBlockReader, tx kv.TemporalTx, begin, end uint64, crit filters.FilterCriteria) (out stream.U64, err error) { +func applyFiltersV3(txNumsReader rawdbv3.TxNumsReader, tx kv.TemporalTx, begin, end uint64, crit filters.FilterCriteria) (out stream.U64, err error) { //[from,to) - txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, br)) - var fromTxNum, toTxNum uint64 if begin > 0 { fromTxNum, err = txNumsReader.Min(tx, begin) @@ -284,12 +281,13 @@ func (api *BaseAPI) getLogsV3(ctx context.Context, tx kv.TemporalTx, begin, end var blockHash common.Hash var header *types.Header - txNumbers, err := applyFiltersV3(ctx, api._blockReader, tx, begin, end, crit) + txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, api._blockReader)) + txNumbers, err := applyFiltersV3(txNumsReader, tx, begin, end, crit) if err != nil { return logs, err } it := rawdbv3.TxNums2BlockNums(tx, - rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, api._blockReader)), + txNumsReader, txNumbers, order.Asc) defer it.Close() var timestamp uint64 @@ -587,7 +585,9 @@ func (i *MapTxNum2BlockNumIter) Next() (txNum, blockNum uint64, txIndex int, isF return } if !ok { - return txNum, i.blockNum, txIndex, isFinalTxn, blockNumChanged, fmt.Errorf("can't find blockNumber by txnID=%d", txNum) + _lb, _lt, _ := i.txNumsReader.Last(i.tx) + _fb, _ft, _ := i.txNumsReader.First(i.tx) + return txNum, i.blockNum, txIndex, isFinalTxn, blockNumChanged, fmt.Errorf("can't find blockNumber by txnID=%d; last in db: (%d-%d, %d-%d)", txNum, _fb, _lb, _ft, _lt) } } blockNum = i.blockNum diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 27c01bc926c..14d1db477d1 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -1772,10 +1772,21 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna type firstKeyGetter func(ctx context.Context) uint64 type dumpFunc func(ctx context.Context, db kv.RoDB, chainConfig *chain.Config, blockFrom, blockTo uint64, firstKey firstKeyGetter, collecter func(v []byte) error, workers int, lvl log.Lvl, logger log.Logger) (uint64, error) +var BlockCompressCfg = seg.Cfg{ + MinPatternScore: 1_000, + MinPatternLen: 8, // `5` - reducing ratio because producing too much prefixes + MaxPatternLen: 128, + SamplingFactor: 4, // not 1 - just to save my time + MaxDictPatterns: 16 * 1024, // the lower RAM used by huffman tree (arrays) + + DictReducerSoftLimit: 1_000_000, + Workers: 1, +} + func dumpRange(ctx context.Context, f snaptype.FileInfo, dumper dumpFunc, firstKey firstKeyGetter, chainDB kv.RoDB, chainConfig *chain.Config, tmpDir string, workers int, lvl log.Lvl, logger log.Logger) (uint64, error) { var lastKeyValue uint64 - compressCfg := seg.DefaultCfg + compressCfg := BlockCompressCfg compressCfg.Workers = workers sn, err := seg.NewCompressor(ctx, "Snapshot "+f.Type.Name(), f.Path, tmpDir, compressCfg, log.LvlTrace, logger) if err != nil { diff --git a/turbo/snapshotsync/freezeblocks/block_sqeeze.go b/turbo/snapshotsync/freezeblocks/block_sqeeze.go new file mode 100644 index 00000000000..45708de566a --- /dev/null +++ b/turbo/snapshotsync/freezeblocks/block_sqeeze.go @@ -0,0 +1,37 @@ +package freezeblocks + +import ( + "context" + + "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon-lib/seg" + "github.com/erigontech/erigon/eth/ethconfig/estimate" +) + +func Sqeeze(ctx context.Context, dirs datadir.Dirs, from, to string, logger log.Logger) error { + logger.Info("[sqeeze] file", "f", to) + decompressor, err := seg.NewDecompressor(from) + if err != nil { + return err + } + defer decompressor.Close() + defer decompressor.EnableReadAhead().DisableReadAhead() + g := decompressor.MakeGetter() + + compressCfg := BlockCompressCfg + compressCfg.Workers = estimate.CompressSnapshot.Workers() + c, err := seg.NewCompressor(ctx, "sqeeze", to, dirs.Tmp, compressCfg, log.LvlInfo, logger) + if err != nil { + return err + } + defer c.Close() + if err := c.ReadFrom(g); err != nil { + return err + } + if err := c.Compress(); err != nil { + return err + } + + return nil +}