Skip to content

Commit

Permalink
erigon seg sqeeze command (#11933)
Browse files Browse the repository at this point in the history
- moved some migrations to `erigon seg squeeze` sub-command - because
they are not migrations
- added `erigon seg squeeze --type=blocks`
- added `Compressor.ReadFrom`
  • Loading branch information
AskAlexSharov authored Sep 11, 2024
1 parent 253f737 commit 292697e
Show file tree
Hide file tree
Showing 19 changed files with 462 additions and 454 deletions.
8 changes: 0 additions & 8 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ var (

workers, reconWorkers uint64
dbWriteMap bool

squeezeCommitment, squeezeStorage, squeezeCode bool
)

func must(err error) {
Expand Down Expand Up @@ -131,12 +129,6 @@ func withBucket(cmd *cobra.Command) {
cmd.Flags().StringVar(&bucket, "bucket", "", "reset given stage")
}

func withSqueezeCommitmentFiles(cmd *cobra.Command) {
cmd.Flags().BoolVar(&squeezeCommitment, "squeeze.commitment", false, "allow to squeeze commitment files on start")
cmd.Flags().BoolVar(&squeezeStorage, "sqeeze.storage", false, "allow to recompress existing .kv files")
cmd.Flags().BoolVar(&squeezeCode, "sqeeze.code", false, "allow to recompress existing .kv files")
}

func withDataDir2(cmd *cobra.Command) {
// --datadir is required, but no --chain flag: read chainConfig from db instead
cmd.Flags().StringVar(&datadirCli, utils.DataDirFlag.Name, "", utils.DataDirFlag.Usage)
Expand Down
4 changes: 0 additions & 4 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,6 @@ var cmdRunMigrations = &cobra.Command{
Short: "",
Run: func(cmd *cobra.Command, args []string) {
logger := debug.SetupCobra(cmd, "integration")
migrations.EnableSqueezeCommitment = squeezeCommitment
migrations.EnableSqeezeStorage = squeezeStorage
migrations.EnableSqeezeCode = squeezeCode
//non-accede and exclusive mode - to apply create new tables if need.
cfg := dbCfg(kv.ChainDB, chaindata).Flags(func(u uint) uint { return u &^ mdbx.Accede }).Exclusive()
db, err := openDB(cfg, true, logger)
Expand Down Expand Up @@ -588,7 +585,6 @@ func init() {

withConfig(cmdRunMigrations)
withDataDir(cmdRunMigrations)
withSqueezeCommitmentFiles(cmdRunMigrations)
withChain(cmdRunMigrations)
withHeimdall(cmdRunMigrations)
rootCmd.AddCommand(cmdRunMigrations)
Expand Down
18 changes: 0 additions & 18 deletions erigon-lib/common/datadir/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,6 @@ func downloaderV2Migration(dirs Dirs) error {
return nil
}

// nolint
func moveFiles(from, to string, ext string) error {
files, err := dir.ReadDir(from)
if err != nil {
return fmt.Errorf("ReadDir: %w, %s", err, from)
}
for _, f := range files {
if f.Type().IsDir() || !f.Type().IsRegular() {
continue
}
if filepath.Ext(f.Name()) != ext {
continue
}
_ = os.Rename(filepath.Join(from, f.Name()), filepath.Join(to, f.Name()))
}
return nil
}

func CopyFile(from, to string) error {
r, err := os.Open(from)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion erigon-lib/kv/rawdbv3/txnum.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ func (t TxNumsReader) FindBlockNum(tx kv.Tx, endTxNumMinimax uint64) (ok bool, b
if err != nil {
return true
}

if !ok {
_lb, _lt, _ := t.Last(tx)
err = fmt.Errorf("FindBlockNum(%d): seems broken TxNum value: %x -> (%x, %x); last in db: (%d, %d)", endTxNumMinimax, seek, i, maxTxNum, _lb, _lt)
Expand Down
11 changes: 11 additions & 0 deletions erigon-lib/seg/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ func (c *Compressor) WorkersAmount() int { return c.Workers }

func (c *Compressor) Count() int { return int(c.wordsCount) }

func (c *Compressor) ReadFrom(g *Getter) error {
var v []byte
for g.HasNext() {
v, _ = g.Next(v[:0])
if err := c.AddWord(v); err != nil {
return err
}
}
return nil
}

func (c *Compressor) AddWord(word []byte) error {
select {
case <-c.ctx.Done():
Expand Down
11 changes: 11 additions & 0 deletions erigon-lib/seg/seg_auto_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ func (c *Writer) AddWord(word []byte) error {
return c.Compressor.AddUncompressedWord(word)
}

func (c *Writer) ReadFrom(r *Reader) error {
var v []byte
for r.HasNext() {
v, _ = r.Next(v[:0])
if err := c.AddWord(v); err != nil {
return err
}
}
return nil
}

func (c *Writer) Close() {
if c.Compressor != nil {
c.Compressor.Close()
Expand Down
27 changes: 6 additions & 21 deletions erigon-lib/state/sqeeze.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (a *Aggregator) Sqeeze(ctx context.Context, domain kv.Domain) error {
return err
}

if err := a.sqeezeFile(ctx, domain, tempFileCopy, to); err != nil {
if err := a.sqeezeDomainFile(ctx, domain, tempFileCopy, to); err != nil {
return err
}
_ = os.Remove(tempFileCopy)
Expand All @@ -51,15 +51,15 @@ func (a *Aggregator) Sqeeze(ctx context.Context, domain kv.Domain) error {
return nil
}

func (a *Aggregator) sqeezeFile(ctx context.Context, domain kv.Domain, from, to string) error {
func (a *Aggregator) sqeezeDomainFile(ctx context.Context, domain kv.Domain, from, to string) error {
if domain == kv.CommitmentDomain {
panic("please use SqueezeCommitmentFiles func")
}

compression := a.d[domain].compression
compressCfg := a.d[domain].compressCfg

a.logger.Info("[recompress] file", "f", to, "cfg", compressCfg, "c", compression)
a.logger.Info("[sqeeze] file", "f", to, "cfg", compressCfg, "c", compression)
decompressor, err := seg.NewDecompressor(from)
if err != nil {
return err
Expand All @@ -68,29 +68,14 @@ func (a *Aggregator) sqeezeFile(ctx context.Context, domain kv.Domain, from, to
defer decompressor.EnableReadAhead().DisableReadAhead()
r := seg.NewReader(decompressor.MakeGetter(), seg.DetectCompressType(decompressor.MakeGetter()))

c, err := seg.NewCompressor(ctx, "recompress", to, a.dirs.Tmp, compressCfg, log.LvlInfo, a.logger)
c, err := seg.NewCompressor(ctx, "sqeeze", to, a.dirs.Tmp, compressCfg, log.LvlInfo, a.logger)
if err != nil {
return err
}
defer c.Close()
w := seg.NewWriter(c, compression)
var k, v []byte
var i int
for r.HasNext() {
i++
k, _ = r.Next(k[:0])
v, _ = r.Next(v[:0])
if err = w.AddWord(k); err != nil {
return err
}
if err = w.AddWord(v); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := w.ReadFrom(r); err != nil {
return err
}
if err := c.Compress(); err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ func ExecV3(ctx context.Context,
return err
}
if !ok {
return fmt.Errorf("seems broken TxNums index not filled. can't find blockNum of txNum=%d", inputTxNum)
_lb, _lt, _ := txNumsReader.Last(applyTx)
_fb, _ft, _ := txNumsReader.First(applyTx)
return fmt.Errorf("seems broken TxNums index not filled. can't find blockNum of txNum=%d; in db: (%d-%d, %d-%d)", inputTxNum, _fb, _lb, _ft, _lt)
}
{
_max, _ := txNumsReader.Max(applyTx, _blockNum)
Expand Down
3 changes: 0 additions & 3 deletions migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ var migrations = map[kv.Label][]Migration{
kv.ChainDB: {
dbSchemaVersion5,
ProhibitNewDownloadsLock,
SqueezeCommitmentFiles,
RecompressCommitmentFiles,
RecompressCodeFiles,
ProhibitNewDownloadsLock2,
ClearBorTables,
},
Expand Down
74 changes: 0 additions & 74 deletions migrations/sqeeze_code.go

This file was deleted.

118 changes: 0 additions & 118 deletions migrations/sqeeze_storage.go

This file was deleted.

Loading

0 comments on commit 292697e

Please sign in to comment.