diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go
index edb328fa9db..d46170c21b5 100644
--- a/cmd/integration/commands/flags.go
+++ b/cmd/integration/commands/flags.go
@@ -58,8 +58,6 @@ var (
workers, reconWorkers uint64
dbWriteMap bool
-
- squeezeCommitment, squeezeStorage, squeezeCode bool
)
func must(err error) {
@@ -131,12 +129,6 @@ func withBucket(cmd *cobra.Command) {
cmd.Flags().StringVar(&bucket, "bucket", "", "reset given stage")
}
-func withSqueezeCommitmentFiles(cmd *cobra.Command) {
- cmd.Flags().BoolVar(&squeezeCommitment, "squeeze.commitment", false, "allow to squeeze commitment files on start")
- cmd.Flags().BoolVar(&squeezeStorage, "sqeeze.storage", false, "allow to recompress existing .kv files")
- cmd.Flags().BoolVar(&squeezeCode, "sqeeze.code", false, "allow to recompress existing .kv files")
-}
-
func withDataDir2(cmd *cobra.Command) {
// --datadir is required, but no --chain flag: read chainConfig from db instead
cmd.Flags().StringVar(&datadirCli, utils.DataDirFlag.Name, "", utils.DataDirFlag.Usage)
diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go
index 51f4c688d04..74c5e246da0 100644
--- a/cmd/integration/commands/stages.go
+++ b/cmd/integration/commands/stages.go
@@ -432,9 +432,6 @@ var cmdRunMigrations = &cobra.Command{
Short: "",
Run: func(cmd *cobra.Command, args []string) {
logger := debug.SetupCobra(cmd, "integration")
- migrations.EnableSqueezeCommitment = squeezeCommitment
- migrations.EnableSqeezeStorage = squeezeStorage
- migrations.EnableSqeezeCode = squeezeCode
//non-accede and exclusive mode - to apply create new tables if need.
cfg := dbCfg(kv.ChainDB, chaindata).Flags(func(u uint) uint { return u &^ mdbx.Accede }).Exclusive()
db, err := openDB(cfg, true, logger)
@@ -588,7 +585,6 @@ func init() {
withConfig(cmdRunMigrations)
withDataDir(cmdRunMigrations)
- withSqueezeCommitmentFiles(cmdRunMigrations)
withChain(cmdRunMigrations)
withHeimdall(cmdRunMigrations)
rootCmd.AddCommand(cmdRunMigrations)
diff --git a/erigon-lib/common/datadir/dirs.go b/erigon-lib/common/datadir/dirs.go
index b80e465ea8e..266625088f2 100644
--- a/erigon-lib/common/datadir/dirs.go
+++ b/erigon-lib/common/datadir/dirs.go
@@ -172,24 +172,6 @@ func downloaderV2Migration(dirs Dirs) error {
return nil
}
-// nolint
-func moveFiles(from, to string, ext string) error {
- files, err := dir.ReadDir(from)
- if err != nil {
- return fmt.Errorf("ReadDir: %w, %s", err, from)
- }
- for _, f := range files {
- if f.Type().IsDir() || !f.Type().IsRegular() {
- continue
- }
- if filepath.Ext(f.Name()) != ext {
- continue
- }
- _ = os.Rename(filepath.Join(from, f.Name()), filepath.Join(to, f.Name()))
- }
- return nil
-}
-
func CopyFile(from, to string) error {
r, err := os.Open(from)
if err != nil {
diff --git a/erigon-lib/kv/rawdbv3/txnum.go b/erigon-lib/kv/rawdbv3/txnum.go
index 18168ca4870..d1d6a38746f 100644
--- a/erigon-lib/kv/rawdbv3/txnum.go
+++ b/erigon-lib/kv/rawdbv3/txnum.go
@@ -214,7 +214,6 @@ func (t TxNumsReader) FindBlockNum(tx kv.Tx, endTxNumMinimax uint64) (ok bool, b
if err != nil {
return true
}
-
if !ok {
_lb, _lt, _ := t.Last(tx)
err = fmt.Errorf("FindBlockNum(%d): seems broken TxNum value: %x -> (%x, %x); last in db: (%d, %d)", endTxNumMinimax, seek, i, maxTxNum, _lb, _lt)
diff --git a/erigon-lib/seg/compress.go b/erigon-lib/seg/compress.go
index 4bbabe7b90e..17bb8ee156b 100644
--- a/erigon-lib/seg/compress.go
+++ b/erigon-lib/seg/compress.go
@@ -179,6 +179,17 @@ func (c *Compressor) WorkersAmount() int { return c.Workers }
func (c *Compressor) Count() int { return int(c.wordsCount) }
+func (c *Compressor) ReadFrom(g *Getter) error {
+ var v []byte
+ for g.HasNext() {
+ v, _ = g.Next(v[:0])
+ if err := c.AddWord(v); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (c *Compressor) AddWord(word []byte) error {
select {
case <-c.ctx.Done():
diff --git a/erigon-lib/seg/seg_auto_rw.go b/erigon-lib/seg/seg_auto_rw.go
index 37ff5bddb21..f16dc073f3e 100644
--- a/erigon-lib/seg/seg_auto_rw.go
+++ b/erigon-lib/seg/seg_auto_rw.go
@@ -141,6 +141,17 @@ func (c *Writer) AddWord(word []byte) error {
return c.Compressor.AddUncompressedWord(word)
}
+func (c *Writer) ReadFrom(r *Reader) error {
+ var v []byte
+ for r.HasNext() {
+ v, _ = r.Next(v[:0])
+ if err := c.AddWord(v); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (c *Writer) Close() {
if c.Compressor != nil {
c.Compressor.Close()
diff --git a/erigon-lib/state/sqeeze.go b/erigon-lib/state/sqeeze.go
index 4db39353a26..a3067277c63 100644
--- a/erigon-lib/state/sqeeze.go
+++ b/erigon-lib/state/sqeeze.go
@@ -38,7 +38,7 @@ func (a *Aggregator) Sqeeze(ctx context.Context, domain kv.Domain) error {
return err
}
- if err := a.sqeezeFile(ctx, domain, tempFileCopy, to); err != nil {
+ if err := a.sqeezeDomainFile(ctx, domain, tempFileCopy, to); err != nil {
return err
}
_ = os.Remove(tempFileCopy)
@@ -51,7 +51,7 @@ func (a *Aggregator) Sqeeze(ctx context.Context, domain kv.Domain) error {
return nil
}
-func (a *Aggregator) sqeezeFile(ctx context.Context, domain kv.Domain, from, to string) error {
+func (a *Aggregator) sqeezeDomainFile(ctx context.Context, domain kv.Domain, from, to string) error {
if domain == kv.CommitmentDomain {
panic("please use SqueezeCommitmentFiles func")
}
@@ -59,7 +59,7 @@ func (a *Aggregator) sqeezeFile(ctx context.Context, domain kv.Domain, from, to
compression := a.d[domain].compression
compressCfg := a.d[domain].compressCfg
- a.logger.Info("[recompress] file", "f", to, "cfg", compressCfg, "c", compression)
+ a.logger.Info("[sqeeze] file", "f", to, "cfg", compressCfg, "c", compression)
decompressor, err := seg.NewDecompressor(from)
if err != nil {
return err
@@ -68,29 +68,14 @@ func (a *Aggregator) sqeezeFile(ctx context.Context, domain kv.Domain, from, to
defer decompressor.EnableReadAhead().DisableReadAhead()
r := seg.NewReader(decompressor.MakeGetter(), seg.DetectCompressType(decompressor.MakeGetter()))
- c, err := seg.NewCompressor(ctx, "recompress", to, a.dirs.Tmp, compressCfg, log.LvlInfo, a.logger)
+ c, err := seg.NewCompressor(ctx, "sqeeze", to, a.dirs.Tmp, compressCfg, log.LvlInfo, a.logger)
if err != nil {
return err
}
defer c.Close()
w := seg.NewWriter(c, compression)
- var k, v []byte
- var i int
- for r.HasNext() {
- i++
- k, _ = r.Next(k[:0])
- v, _ = r.Next(v[:0])
- if err = w.AddWord(k); err != nil {
- return err
- }
- if err = w.AddWord(v); err != nil {
- return err
- }
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
+ if err := w.ReadFrom(r); err != nil {
+ return err
}
if err := c.Compress(); err != nil {
return err
diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go
index 4022149e481..79b81f3c26d 100644
--- a/eth/stagedsync/exec3.go
+++ b/eth/stagedsync/exec3.go
@@ -281,7 +281,9 @@ func ExecV3(ctx context.Context,
return err
}
if !ok {
- return fmt.Errorf("seems broken TxNums index not filled. can't find blockNum of txNum=%d", inputTxNum)
+ _lb, _lt, _ := txNumsReader.Last(applyTx)
+ _fb, _ft, _ := txNumsReader.First(applyTx)
+ return fmt.Errorf("seems broken TxNums index not filled. can't find blockNum of txNum=%d; in db: (%d-%d, %d-%d)", inputTxNum, _fb, _lb, _ft, _lt)
}
{
_max, _ := txNumsReader.Max(applyTx, _blockNum)
diff --git a/migrations/migrations.go b/migrations/migrations.go
index afe4cd0b275..211d77f860e 100644
--- a/migrations/migrations.go
+++ b/migrations/migrations.go
@@ -53,9 +53,6 @@ var migrations = map[kv.Label][]Migration{
kv.ChainDB: {
dbSchemaVersion5,
ProhibitNewDownloadsLock,
- SqueezeCommitmentFiles,
- RecompressCommitmentFiles,
- RecompressCodeFiles,
ProhibitNewDownloadsLock2,
ClearBorTables,
},
diff --git a/migrations/sqeeze_code.go b/migrations/sqeeze_code.go
deleted file mode 100644
index 56a65a4e22f..00000000000
--- a/migrations/sqeeze_code.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright 2024 The Erigon Authors
-// This file is part of Erigon.
-//
-// Erigon is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// Erigon is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with Erigon. If not, see .
-
-package migrations
-
-import (
- "context"
- "time"
-
- "github.com/erigontech/erigon-lib/common/datadir"
- "github.com/erigontech/erigon-lib/config3"
- "github.com/erigontech/erigon-lib/kv"
- "github.com/erigontech/erigon-lib/log/v3"
- "github.com/erigontech/erigon-lib/state"
- "github.com/erigontech/erigon/eth/ethconfig/estimate"
-)
-
-var EnableSqeezeCode = false
-
-var RecompressCodeFiles = Migration{
- Name: "code_recompress",
- Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) {
- ctx := context.Background()
-
- if !EnableSqeezeCode {
- log.Info("[recompress_code_migration] disabled")
- return db.Update(ctx, func(tx kv.RwTx) error {
- return BeforeCommit(tx, nil, true)
- })
- }
-
- logEvery := time.NewTicker(10 * time.Second)
- defer logEvery.Stop()
- t := time.Now()
- defer func() {
- log.Info("[sqeeze_migration] done", "took", time.Since(t))
- }()
-
- agg, err := state.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger)
- if err != nil {
- return err
- }
- defer agg.Close()
- agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
-
- log.Info("[sqeeze_migration] start")
- if err := agg.Sqeeze(ctx, kv.CodeDomain); err != nil {
- return err
- }
- if err = agg.OpenFolder(); err != nil {
- return err
- }
- if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
- return err
- }
-
- return db.Update(ctx, func(tx kv.RwTx) error {
- return BeforeCommit(tx, nil, true)
- })
- },
-}
diff --git a/migrations/sqeeze_storage.go b/migrations/sqeeze_storage.go
deleted file mode 100644
index 04da2b4fccb..00000000000
--- a/migrations/sqeeze_storage.go
+++ /dev/null
@@ -1,118 +0,0 @@
-// Copyright 2024 The Erigon Authors
-// This file is part of Erigon.
-//
-// Erigon is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// Erigon is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with Erigon. If not, see .
-
-package migrations
-
-import (
- "context"
- "os"
- "time"
-
- "github.com/erigontech/erigon-lib/common/datadir"
- "github.com/erigontech/erigon-lib/common/dir"
- "github.com/erigontech/erigon-lib/config3"
- "github.com/erigontech/erigon-lib/kv"
- "github.com/erigontech/erigon-lib/log/v3"
- "github.com/erigontech/erigon-lib/state"
- "github.com/erigontech/erigon/eth/ethconfig/estimate"
-)
-
-var EnableSqeezeStorage = false
-
-var RecompressCommitmentFiles = Migration{
- Name: "recompress_commit_files",
- Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) {
- ctx := context.Background()
-
- if !EnableSqeezeStorage {
- log.Info("[recompress_migration] disabled")
- return db.Update(ctx, func(tx kv.RwTx) error {
- return BeforeCommit(tx, nil, true)
- })
- }
-
- logEvery := time.NewTicker(10 * time.Second)
- defer logEvery.Stop()
- t := time.Now()
- defer func() {
- log.Info("[recompress_migration] done", "took", time.Since(t))
- }()
-
- agg, err := state.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger)
- if err != nil {
- return err
- }
- defer agg.Close()
- agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
-
- log.Info("[recompress_migration] start")
- dirsOld := dirs
- dirsOld.SnapDomain += "_old"
- dir.MustExist(dirsOld.SnapDomain, dirs.SnapDomain+"_backup")
- if err := agg.Sqeeze(ctx, kv.StorageDomain); err != nil {
- return err
- }
-
- if err = agg.OpenFolder(); err != nil {
- return err
- }
- if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
- return err
- }
- ac := agg.BeginFilesRo()
- defer ac.Close()
-
- aggOld, err := state.NewAggregator(ctx, dirsOld, config3.HistoryV3AggregationStep, db, nil, logger)
- if err != nil {
- panic(err)
- }
- defer aggOld.Close()
- if err = aggOld.OpenFolder(); err != nil {
- panic(err)
- }
- aggOld.SetCompressWorkers(estimate.CompressSnapshot.Workers())
- if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
- return err
- }
- if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
- return err
- }
-
- acOld := aggOld.BeginFilesRo()
- defer acOld.Close()
-
- if err = acOld.SqueezeCommitmentFiles(ac); err != nil {
- return err
- }
- acOld.Close()
- ac.Close()
- if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
- return err
- }
- if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
- return err
- }
- agg.Close()
- aggOld.Close()
-
- log.Info("[recompress] removing", "dir", dirsOld.SnapDomain)
- _ = os.RemoveAll(dirsOld.SnapDomain)
- log.Info("[recompress] success", "please_remove", dirs.SnapDomain+"_backup")
- return db.Update(ctx, func(tx kv.RwTx) error {
- return BeforeCommit(tx, nil, true)
- })
- },
-}
diff --git a/migrations/squeeze_commitment.go b/migrations/squeeze_commitment.go
deleted file mode 100644
index 59bb0c1f2f0..00000000000
--- a/migrations/squeeze_commitment.go
+++ /dev/null
@@ -1,78 +0,0 @@
-// Copyright 2024 The Erigon Authors
-// This file is part of Erigon.
-//
-// Erigon is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// Erigon is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with Erigon. If not, see .
-
-package migrations
-
-import (
- "context"
- "time"
-
- "github.com/erigontech/erigon-lib/common/datadir"
- "github.com/erigontech/erigon-lib/config3"
- "github.com/erigontech/erigon-lib/kv"
- "github.com/erigontech/erigon-lib/log/v3"
- libstate "github.com/erigontech/erigon-lib/state"
- "github.com/erigontech/erigon/eth/ethconfig/estimate"
-)
-
-var EnableSqueezeCommitment = false
-
-var SqueezeCommitmentFiles = Migration{
- Name: "squeeze_commit_files",
- Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) {
- ctx := context.Background()
-
- if !EnableSqueezeCommitment {
- log.Info("[sqeeze_migration] disabled")
- return db.Update(ctx, func(tx kv.RwTx) error {
- return BeforeCommit(tx, nil, true)
- })
- }
-
- logEvery := time.NewTicker(10 * time.Second)
- defer logEvery.Stop()
- t := time.Now()
- defer func() {
- log.Info("[sqeeze_migration] done", "took", time.Since(t))
- }()
-
- log.Info("[sqeeze_migration] 'squeeze' mode start")
- agg, err := libstate.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger)
- if err != nil {
- return err
- }
- defer agg.Close()
- agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
- if err = agg.OpenFolder(); err != nil {
- return err
- }
- if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
- return err
- }
- ac := agg.BeginFilesRo()
- defer ac.Close()
- if err = ac.SqueezeCommitmentFiles(ac); err != nil {
- return err
- }
- ac.Close()
- if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
- return err
- }
- return db.Update(ctx, func(tx kv.RwTx) error {
- return BeforeCommit(tx, nil, true)
- })
- },
-}
diff --git a/turbo/app/README.md b/turbo/app/README.md
index 01acd430bdf..ab0e775eb15 100644
--- a/turbo/app/README.md
+++ b/turbo/app/README.md
@@ -8,32 +8,36 @@
## Support
-This command connects erigon to diagnostics tools by establishing websocket connection.
+This command connects erigon to diagnostics tools by establishing websocket connection.
+
+In order to conect diagnostics run
-In order to conect diagnostics run
```
./build/bin/erigon support --debug.addrs --diagnostics.addr --diagnostics.sessions
```
-| | |
-|---|---|
-|diagnostics.addr|Address of the diagnostics system provided by the support team, include unique session PIN. [Instructions how to get proper adderess](https://github.com/erigontech/diagnostics?tab=readme-ov-file#step-4)|
-|debug.addrs|Comma separated list of URLs to the debug endpoints thats are being diagnosed. This endpoints must mutch values of `diagnostics.endpoint.addr:diagnostics.endpoint.port` by default its `localhost:6060`|
-|diagnostics.sessions|Comma separated list of session PINs to connect to [Instructions how to obtain PIN](https://github.com/erigontech/diagnostics?tab=readme-ov-file#step-2)|
-|||
-
+| | |
+|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| diagnostics.addr | Address of the diagnostics system provided by the support team, include unique session PIN. [Instructions how to get proper adderess](https://github.com/erigontech/diagnostics?tab=readme-ov-file#step-4) |
+| debug.addrs | Comma separated list of URLs to the debug endpoints thats are being diagnosed. This endpoints must mutch values of `diagnostics.endpoint.addr:diagnostics.endpoint.port` by default its `localhost:6060` |
+| diagnostics.sessions | Comma separated list of session PINs to connect to [Instructions how to obtain PIN](https://github.com/erigontech/diagnostics?tab=readme-ov-file#step-2) |
+| | |
## Snapshots
This sub command can be used for manipulating snapshot files
+## Danger zone: `seg sqeeze`
+
+To perform foreign-key-awared re-compression of files
+
### Uploader
The `snapshots uploader` command starts a version of erigon customized for uploading snapshot files to
-a remote location.
+a remote location.
It breaks the stage execution process after the senders stage and then uses the snapshot stage to send
-uploaded headers, bodies and (in the case of polygon) bor spans and events to snapshot files. Because
+uploaded headers, bodies and (in the case of polygon) bor spans and events to snapshot files. Because
this process avoids execution in run signifigantly faster than a standard erigon configuration.
The uploader uses rclone to send seedable (100K or 500K blocks) to a remote storage location specified
@@ -51,14 +55,13 @@ in addition to this it has the following performance related features:
The following configuration can be used to upload blocks from genesis where:
-| | |
-|---|---|
-| sync.loop.prune.limit=500000 | Sets the records to be pruned to the database to 500,000 per iteration (as opposed to 100) |
-| upload.location=r2:erigon-v2-snapshots-bor-mainnet | Specified the rclone loaction to upload snapshot to |
-| upload.from=earliest | Sets the upload start location to be the earliest available block, which will be 0 in the case of a fresh installation, or specified by the last block in the chaindata db |
-| upload.snapshot.limit=1500000 | Tells the uploader to keep a maximum 1,500,000 blocks in the `snapshots` before deleting the aged snapshot |
-| snapshot.version=2 | Indivates the version to be appended to snapshot file names when they are creatated|
-
+| | |
+|----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| sync.loop.prune.limit=500000 | Sets the records to be pruned to the database to 500,000 per iteration (as opposed to 100) |
+| upload.location=r2:erigon-v2-snapshots-bor-mainnet | Specified the rclone loaction to upload snapshot to |
+| upload.from=earliest | Sets the upload start location to be the earliest available block, which will be 0 in the case of a fresh installation, or specified by the last block in the chaindata db |
+| upload.snapshot.limit=1500000 | Tells the uploader to keep a maximum 1,500,000 blocks in the `snapshots` before deleting the aged snapshot |
+| snapshot.version=2 | Indivates the version to be appended to snapshot file names when they are creatated |
```shell
erigon/build/bin/erigon seg uploader --datadir=~/snapshots/bor-mainnet --chain=bor-mainnet \
@@ -67,7 +70,8 @@ erigon/build/bin/erigon seg uploader --datadir=~/snapshots/bor-mainnet --chain=b
--upload.snapshot.limit=1500000
```
-In order to start with the lates uploaded block when starting with an empty drive set the `upload.from` flag to `latest`. e.g.
+In order to start with the lates uploaded block when starting with an empty drive set the `upload.from` flag to
+`latest`. e.g.
```shell
--upload.from=latest
diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go
index ac1a7fee935..d7b260aa3e9 100644
--- a/turbo/app/snapshots_cmd.go
+++ b/turbo/app/snapshots_cmd.go
@@ -212,104 +212,9 @@ var snapshotCommand = cli.Command{
Flags: joinFlags([]cli.Flag{&utils.DataDirFlag}),
},
{
- Name: "rm-state-snapshots",
- Action: func(cliCtx *cli.Context) error {
- dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
-
- removeLatest := cliCtx.Bool("latest")
- steprm := cliCtx.String("step")
- if steprm == "" && !removeLatest {
- return errors.New("step to remove is required (eg 0-2) OR flag --latest provided")
- }
- if steprm != "" {
- removeLatest = false // --step has higher priority
- }
-
- _maxFrom := uint64(0)
- files := make([]snaptype.FileInfo, 0)
- for _, dirPath := range []string{dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors} {
- filePaths, err := dir.ListFiles(dirPath)
- if err != nil {
- return err
- }
- for _, filePath := range filePaths {
- _, fName := filepath.Split(filePath)
- res, isStateFile, ok := snaptype.ParseFileName(dirPath, fName)
- if !ok || !isStateFile {
- fmt.Printf("skipping %s\n", filePath)
- continue
- }
- if res.From == 0 && res.To == 0 {
- parts := strings.Split(fName, ".")
- if len(parts) == 3 || len(parts) == 4 {
- fsteps := strings.Split(parts[1], "-")
- res.From, err = strconv.ParseUint(fsteps[0], 10, 64)
- if err != nil {
- return err
- }
- res.To, err = strconv.ParseUint(fsteps[1], 10, 64)
- if err != nil {
- return err
- }
- }
- }
-
- files = append(files, res)
- if removeLatest {
- _maxFrom = max(_maxFrom, res.From)
- }
- }
- }
-
- var minS, maxS uint64
- if removeLatest {
- AllowPruneSteps:
- fmt.Printf("remove latest snapshot files with stepFrom=%d?\n1) Remove\n2) Exit\n (pick number): ", _maxFrom)
- var ans uint8
- _, err := fmt.Scanf("%d\n", &ans)
- if err != nil {
- return err
- }
- switch ans {
- case 1:
- minS, maxS = _maxFrom, math.MaxUint64
- break
- case 2:
- return nil
- default:
- fmt.Printf("invalid input: %d; Just an answer number expected.\n", ans)
- goto AllowPruneSteps
- }
- } else if steprm != "" {
- parseStep := func(step string) (uint64, uint64, error) {
- var from, to uint64
- if _, err := fmt.Sscanf(step, "%d-%d", &from, &to); err != nil {
- return 0, 0, fmt.Errorf("step expected in format from-to, got %s", step)
- }
- return from, to, nil
- }
- var err error
- minS, maxS, err = parseStep(steprm)
- if err != nil {
- return err
- }
- } else {
- panic("unexpected arguments")
- }
-
- var removed int
- for _, res := range files {
- if res.From >= minS && res.To <= maxS {
- if err := os.Remove(res.Path); err != nil {
- return fmt.Errorf("failed to remove %s: %w", res.Path, err)
- }
- removed++
- }
- }
- fmt.Printf("removed %d state snapshot files\n", removed)
- return nil
- },
- Flags: joinFlags([]cli.Flag{&utils.DataDirFlag, &cli.StringFlag{Name: "step", Required: false}, &cli.BoolFlag{Name: "latest", Required: false}}),
+ Name: "rm-state-snapshots",
+ Action: doRmStateSnapshots,
+ Flags: joinFlags([]cli.Flag{&utils.DataDirFlag, &cli.StringFlag{Name: "step", Required: false}, &cli.BoolFlag{Name: "latest", Required: false}}),
},
{
Name: "diff",
@@ -333,6 +238,14 @@ var snapshotCommand = cli.Command{
&cli.StringFlag{Name: "domain", Required: true},
}),
},
+ {
+ Name: "sqeeze",
+ Action: doSqueeze,
+ Flags: joinFlags([]cli.Flag{
+ &utils.DataDirFlag,
+ &cli.StringFlag{Name: "type", Required: true},
+ }),
+ },
{
Name: "integrity",
Action: doIntegrity,
@@ -385,6 +298,103 @@ var (
}
)
+func doRmStateSnapshots(cliCtx *cli.Context) error {
+ dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
+
+ removeLatest := cliCtx.Bool("latest")
+ steprm := cliCtx.String("step")
+ if steprm == "" && !removeLatest {
+ return errors.New("step to remove is required (eg 0-2) OR flag --latest provided")
+ }
+ if steprm != "" {
+ removeLatest = false // --step has higher priority
+ }
+
+ _maxFrom := uint64(0)
+ files := make([]snaptype.FileInfo, 0)
+ for _, dirPath := range []string{dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors} {
+ filePaths, err := dir.ListFiles(dirPath)
+ if err != nil {
+ return err
+ }
+ for _, filePath := range filePaths {
+ _, fName := filepath.Split(filePath)
+ res, isStateFile, ok := snaptype.ParseFileName(dirPath, fName)
+ if !ok || !isStateFile {
+ fmt.Printf("skipping %s\n", filePath)
+ continue
+ }
+ if res.From == 0 && res.To == 0 {
+ parts := strings.Split(fName, ".")
+ if len(parts) == 3 || len(parts) == 4 {
+ fsteps := strings.Split(parts[1], "-")
+ res.From, err = strconv.ParseUint(fsteps[0], 10, 64)
+ if err != nil {
+ return err
+ }
+ res.To, err = strconv.ParseUint(fsteps[1], 10, 64)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ files = append(files, res)
+ if removeLatest {
+ _maxFrom = max(_maxFrom, res.From)
+ }
+ }
+ }
+
+ var minS, maxS uint64
+ if removeLatest {
+ AllowPruneSteps:
+ fmt.Printf("remove latest snapshot files with stepFrom=%d?\n1) Remove\n2) Exit\n (pick number): ", _maxFrom)
+ var ans uint8
+ _, err := fmt.Scanf("%d\n", &ans)
+ if err != nil {
+ return err
+ }
+ switch ans {
+ case 1:
+ minS, maxS = _maxFrom, math.MaxUint64
+ break
+ case 2:
+ return nil
+ default:
+ fmt.Printf("invalid input: %d; Just an answer number expected.\n", ans)
+ goto AllowPruneSteps
+ }
+ } else if steprm != "" {
+ parseStep := func(step string) (uint64, uint64, error) {
+ var from, to uint64
+ if _, err := fmt.Sscanf(step, "%d-%d", &from, &to); err != nil {
+ return 0, 0, fmt.Errorf("step expected in format from-to, got %s", step)
+ }
+ return from, to, nil
+ }
+ var err error
+ minS, maxS, err = parseStep(steprm)
+ if err != nil {
+ return err
+ }
+ } else {
+ panic("unexpected arguments")
+ }
+
+ var removed int
+ for _, res := range files {
+ if res.From >= minS && res.To <= maxS {
+ if err := os.Remove(res.Path); err != nil {
+ return fmt.Errorf("failed to remove %s: %w", res.Path, err)
+ }
+ removed++
+ }
+ }
+ fmt.Printf("removed %d state snapshot files\n", removed)
+ return nil
+}
+
func doBtSearch(cliCtx *cli.Context) error {
logger, _, _, err := debug.Setup(cliCtx, true /* root logger */)
if err != nil {
diff --git a/turbo/app/sqeeze_cmd.go b/turbo/app/sqeeze_cmd.go
new file mode 100644
index 00000000000..6c8c8287fcd
--- /dev/null
+++ b/turbo/app/sqeeze_cmd.go
@@ -0,0 +1,240 @@
+// Copyright 2024 The Erigon Authors
+// This file is part of Erigon.
+//
+// Erigon is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Erigon is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with Erigon. If not, see .
+
+package app
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/erigontech/erigon-lib/common/dir"
+ "github.com/erigontech/erigon-lib/config3"
+ "github.com/erigontech/erigon-lib/downloader/snaptype"
+ "github.com/erigontech/erigon-lib/kv"
+ "github.com/erigontech/erigon-lib/kv/rawdbv3"
+ "github.com/erigontech/erigon-lib/log/v3"
+ "github.com/erigontech/erigon-lib/state"
+ "github.com/erigontech/erigon/cmd/hack/tool/fromdb"
+ "github.com/erigontech/erigon/core/rawdb"
+ snaptype2 "github.com/erigontech/erigon/core/snaptype"
+ "github.com/erigontech/erigon/eth/ethconfig"
+ "github.com/erigontech/erigon/eth/ethconfig/estimate"
+ "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"
+ "github.com/urfave/cli/v2"
+
+ "github.com/erigontech/erigon-lib/common/datadir"
+ "github.com/erigontech/erigon/cmd/utils"
+ "github.com/erigontech/erigon/turbo/debug"
+)
+
+type Sqeeze string
+
+var (
+ SqeezeCommitment Sqeeze = "commitment"
+ SqeezeStorage Sqeeze = "storage"
+ SqeezeCode Sqeeze = "code"
+ SqeezeBlocks Sqeeze = "blocks"
+)
+
+func doSqueeze(cliCtx *cli.Context) error {
+ dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
+ logger, _, _, err := debug.Setup(cliCtx, true /* rootLogger */)
+ if err != nil {
+ return err
+ }
+ ctx := cliCtx.Context
+ logEvery := time.NewTicker(10 * time.Second)
+ defer logEvery.Stop()
+
+ t := Sqeeze(cliCtx.String("type"))
+
+ start := time.Now()
+ log.Info("[sqeeze] start", "t", t)
+ defer func() { logger.Info("[sqeeze] done", "t", t, "took", time.Since(start)) }()
+
+ switch {
+ case t == SqeezeCommitment:
+ return squeezeCommitment(ctx, dirs, logger)
+ case t == SqeezeStorage:
+ return squeezeStorage(ctx, dirs, logger)
+ case t == SqeezeCode:
+ return squeezeCode(ctx, dirs, logger)
+ case t == SqeezeBlocks:
+ return squeezeBlocks(ctx, dirs, logger)
+ default:
+
+ return fmt.Errorf("unknown type: %s", t)
+ }
+}
+
+func squeezeCommitment(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error {
+ db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
+ defer db.Close()
+ cr := rawdb.NewCanonicalReader(rawdbv3.TxNums)
+ agg := openAgg(ctx, dirs, db, cr, logger)
+ agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
+ if err := agg.OpenFolder(); err != nil {
+ return err
+ }
+ if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
+ return err
+ }
+ ac := agg.BeginFilesRo()
+ defer ac.Close()
+ if err := ac.SqueezeCommitmentFiles(ac); err != nil {
+ return err
+ }
+ ac.Close()
+ if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
+ return err
+ }
+ return nil
+}
+
+func squeezeStorage(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error {
+ db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
+ defer db.Close()
+ cr := rawdb.NewCanonicalReader(rawdbv3.TxNums)
+ agg := openAgg(ctx, dirs, db, cr, logger)
+ agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
+ dirsOld := dirs
+ dirsOld.SnapDomain += "_old"
+ dir.MustExist(dirsOld.SnapDomain, dirs.SnapDomain+"_backup")
+ if err := agg.Sqeeze(ctx, kv.StorageDomain); err != nil {
+ return err
+ }
+
+ if err := agg.OpenFolder(); err != nil {
+ return err
+ }
+ if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
+ return err
+ }
+ ac := agg.BeginFilesRo()
+ defer ac.Close()
+
+ aggOld, err := state.NewAggregator(ctx, dirsOld, config3.HistoryV3AggregationStep, db, nil, logger)
+ if err != nil {
+ panic(err)
+ }
+ defer aggOld.Close()
+ if err = aggOld.OpenFolder(); err != nil {
+ panic(err)
+ }
+ aggOld.SetCompressWorkers(estimate.CompressSnapshot.Workers())
+ if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
+ return err
+ }
+ if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
+ return err
+ }
+
+ acOld := aggOld.BeginFilesRo()
+ defer acOld.Close()
+
+ if err = acOld.SqueezeCommitmentFiles(ac); err != nil {
+ return err
+ }
+ acOld.Close()
+ ac.Close()
+ if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
+ return err
+ }
+ if err := aggOld.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
+ return err
+ }
+ agg.Close()
+ aggOld.Close()
+
+ log.Info("[sqeeze] removing", "dir", dirsOld.SnapDomain)
+ _ = os.RemoveAll(dirsOld.SnapDomain)
+ log.Info("[sqeeze] success", "please_remove", dirs.SnapDomain+"_backup")
+ return nil
+}
+func squeezeCode(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error {
+ db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
+ defer db.Close()
+ agg, err := state.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, nil, logger)
+ if err != nil {
+ return err
+ }
+ defer agg.Close()
+ agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
+
+ log.Info("[sqeeze] start")
+ if err := agg.Sqeeze(ctx, kv.CodeDomain); err != nil {
+ return err
+ }
+ if err = agg.OpenFolder(); err != nil {
+ return err
+ }
+ if err := agg.BuildMissedIndices(ctx, estimate.IndexSnapshot.Workers()); err != nil {
+ return err
+ }
+ return nil
+}
+
+func squeezeBlocks(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error {
+ for _, f := range ls(dirs.Snap, ".seg") {
+ good := strings.Contains(f, snaptype2.Transactions.Name()) ||
+ strings.Contains(f, snaptype2.Headers.Name())
+ if !good {
+ continue
+ }
+ _, name := filepath.Split(f)
+ in, _, ok := snaptype.ParseFileName(dirs.Snap, name)
+ if !ok {
+ continue
+ }
+ good = in.To-in.From == snaptype.Erigon2OldMergeLimit || in.To-in.From == snaptype.Erigon2MergeLimit
+ if !good {
+ continue
+ }
+ if err := freezeblocks.Sqeeze(ctx, dirs, f, f, logger); err != nil {
+ return err
+ }
+ _ = os.Remove(strings.ReplaceAll(f, ".seg", ".seg.torrent"))
+ _ = os.Remove(strings.ReplaceAll(f, ".seg", ".idx"))
+ _ = os.Remove(strings.ReplaceAll(f, ".seg", ".idx.torrent"))
+ }
+
+ db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
+ defer db.Close()
+ cfg := ethconfig.NewSnapCfg(false, true, true)
+ _, _, _, br, _, clean, err := openSnaps(ctx, cfg, dirs, 0, db, logger)
+ if err != nil {
+ return err
+ }
+ defer clean()
+
+ chainConfig := fromdb.ChainConfig(db)
+ if err := br.BuildMissedIndicesIfNeed(ctx, "retire", nil, chainConfig); err != nil {
+ return err
+ }
+ return nil
+}
+
+func ls(dirPath string, ext string) []string {
+ res, err := dir.ListFiles(dirPath, ext)
+ if err != nil {
+ panic(err)
+ }
+ return res
+}
diff --git a/turbo/jsonrpc/erigon_receipts.go b/turbo/jsonrpc/erigon_receipts.go
index d68c565d1ff..f1a14e270b1 100644
--- a/turbo/jsonrpc/erigon_receipts.go
+++ b/turbo/jsonrpc/erigon_receipts.go
@@ -194,7 +194,8 @@ func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCri
exec := exec3.NewTraceWorker(tx, chainConfig, api.engine(), api._blockReader, nil)
defer exec.Close()
- txNumbers, err := applyFiltersV3(ctx, api._blockReader, tx, begin, end, crit)
+ txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, api._blockReader))
+ txNumbers, err := applyFiltersV3(txNumsReader, tx, begin, end, crit)
if err != nil {
return erigonLogs, err
}
@@ -212,7 +213,7 @@ func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCri
// latest logs that match the filter crit
it := rawdbv3.TxNums2BlockNums(tx,
- rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, api._blockReader)),
+ txNumsReader,
txNumbers, order.Asc)
defer it.Close()
diff --git a/turbo/jsonrpc/eth_receipts.go b/turbo/jsonrpc/eth_receipts.go
index 4eee95b788f..a247dc17188 100644
--- a/turbo/jsonrpc/eth_receipts.go
+++ b/turbo/jsonrpc/eth_receipts.go
@@ -39,7 +39,6 @@ import (
bortypes "github.com/erigontech/erigon/polygon/bor/types"
"github.com/erigontech/erigon/rpc"
"github.com/erigontech/erigon/turbo/rpchelper"
- "github.com/erigontech/erigon/turbo/services"
"github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"
)
@@ -225,10 +224,8 @@ func applyFilters(out *roaring.Bitmap, tx kv.Tx, begin, end uint64, crit filters
return nil
}
-func applyFiltersV3(ctx context.Context, br services.FullBlockReader, tx kv.TemporalTx, begin, end uint64, crit filters.FilterCriteria) (out stream.U64, err error) {
+func applyFiltersV3(txNumsReader rawdbv3.TxNumsReader, tx kv.TemporalTx, begin, end uint64, crit filters.FilterCriteria) (out stream.U64, err error) {
//[from,to)
- txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, br))
-
var fromTxNum, toTxNum uint64
if begin > 0 {
fromTxNum, err = txNumsReader.Min(tx, begin)
@@ -284,12 +281,13 @@ func (api *BaseAPI) getLogsV3(ctx context.Context, tx kv.TemporalTx, begin, end
var blockHash common.Hash
var header *types.Header
- txNumbers, err := applyFiltersV3(ctx, api._blockReader, tx, begin, end, crit)
+ txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, api._blockReader))
+ txNumbers, err := applyFiltersV3(txNumsReader, tx, begin, end, crit)
if err != nil {
return logs, err
}
it := rawdbv3.TxNums2BlockNums(tx,
- rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, api._blockReader)),
+ txNumsReader,
txNumbers, order.Asc)
defer it.Close()
var timestamp uint64
@@ -587,7 +585,9 @@ func (i *MapTxNum2BlockNumIter) Next() (txNum, blockNum uint64, txIndex int, isF
return
}
if !ok {
- return txNum, i.blockNum, txIndex, isFinalTxn, blockNumChanged, fmt.Errorf("can't find blockNumber by txnID=%d", txNum)
+ _lb, _lt, _ := i.txNumsReader.Last(i.tx)
+ _fb, _ft, _ := i.txNumsReader.First(i.tx)
+ return txNum, i.blockNum, txIndex, isFinalTxn, blockNumChanged, fmt.Errorf("can't find blockNumber by txnID=%d; last in db: (%d-%d, %d-%d)", txNum, _fb, _lb, _ft, _lt)
}
}
blockNum = i.blockNum
diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go
index 27c01bc926c..14d1db477d1 100644
--- a/turbo/snapshotsync/freezeblocks/block_snapshots.go
+++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go
@@ -1772,10 +1772,21 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna
type firstKeyGetter func(ctx context.Context) uint64
type dumpFunc func(ctx context.Context, db kv.RoDB, chainConfig *chain.Config, blockFrom, blockTo uint64, firstKey firstKeyGetter, collecter func(v []byte) error, workers int, lvl log.Lvl, logger log.Logger) (uint64, error)
+var BlockCompressCfg = seg.Cfg{
+ MinPatternScore: 1_000,
+ MinPatternLen: 8, // `5` - reducing ratio because producing too much prefixes
+ MaxPatternLen: 128,
+ SamplingFactor: 4, // not 1 - just to save my time
+ MaxDictPatterns: 16 * 1024, // the lower RAM used by huffman tree (arrays)
+
+ DictReducerSoftLimit: 1_000_000,
+ Workers: 1,
+}
+
func dumpRange(ctx context.Context, f snaptype.FileInfo, dumper dumpFunc, firstKey firstKeyGetter, chainDB kv.RoDB, chainConfig *chain.Config, tmpDir string, workers int, lvl log.Lvl, logger log.Logger) (uint64, error) {
var lastKeyValue uint64
- compressCfg := seg.DefaultCfg
+ compressCfg := BlockCompressCfg
compressCfg.Workers = workers
sn, err := seg.NewCompressor(ctx, "Snapshot "+f.Type.Name(), f.Path, tmpDir, compressCfg, log.LvlTrace, logger)
if err != nil {
diff --git a/turbo/snapshotsync/freezeblocks/block_sqeeze.go b/turbo/snapshotsync/freezeblocks/block_sqeeze.go
new file mode 100644
index 00000000000..45708de566a
--- /dev/null
+++ b/turbo/snapshotsync/freezeblocks/block_sqeeze.go
@@ -0,0 +1,37 @@
+package freezeblocks
+
+import (
+ "context"
+
+ "github.com/erigontech/erigon-lib/common/datadir"
+ "github.com/erigontech/erigon-lib/log/v3"
+ "github.com/erigontech/erigon-lib/seg"
+ "github.com/erigontech/erigon/eth/ethconfig/estimate"
+)
+
+func Sqeeze(ctx context.Context, dirs datadir.Dirs, from, to string, logger log.Logger) error {
+ logger.Info("[sqeeze] file", "f", to)
+ decompressor, err := seg.NewDecompressor(from)
+ if err != nil {
+ return err
+ }
+ defer decompressor.Close()
+ defer decompressor.EnableReadAhead().DisableReadAhead()
+ g := decompressor.MakeGetter()
+
+ compressCfg := BlockCompressCfg
+ compressCfg.Workers = estimate.CompressSnapshot.Workers()
+ c, err := seg.NewCompressor(ctx, "sqeeze", to, dirs.Tmp, compressCfg, log.LvlInfo, logger)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ if err := c.ReadFrom(g); err != nil {
+ return err
+ }
+ if err := c.Compress(); err != nil {
+ return err
+ }
+
+ return nil
+}