Skip to content

Commit

Permalink
compression of storage.kv (#11712)
Browse files Browse the repository at this point in the history
Co-authored-by: Mark Holt <[email protected]>
  • Loading branch information
AskAlexSharov and mh0lt authored Aug 31, 2024
1 parent 68f4196 commit 7df0561
Show file tree
Hide file tree
Showing 17 changed files with 507 additions and 82 deletions.
5 changes: 3 additions & 2 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ var (
bucket string
datadirCli, toChaindata string
migration string
squeezeCommitmentFiles bool
integrityFast, integritySlow bool
file string
HeimdallURL string
Expand All @@ -57,9 +56,10 @@ var (
startTxNum uint64
traceFromTx uint64

_forceSetHistoryV3 bool
workers, reconWorkers uint64
dbWriteMap bool

squeezeCommitmentFiles, recompressCommitmentFiles bool
)

func must(err error) {
Expand Down Expand Up @@ -133,6 +133,7 @@ func withBucket(cmd *cobra.Command) {

func withSqueezeCommitmentFiles(cmd *cobra.Command) {
cmd.Flags().BoolVar(&squeezeCommitmentFiles, "squeeze", false, "allow to squeeze commitment files on start")
cmd.Flags().BoolVar(&recompressCommitmentFiles, "recompress", false, "allow to recompress existing .kv files")
}

func withDataDir2(cmd *cobra.Command) {
Expand Down
1 change: 1 addition & 0 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ var cmdRunMigrations = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
logger := debug.SetupCobra(cmd, "integration")
migrations.EnableSqueezeCommitmentFiles = squeezeCommitmentFiles
migrations.EnableRecompressCommitmentFiles = recompressCommitmentFiles
//non-accede and exclusive mode - to apply create new tables if need.
cfg := dbCfg(kv.ChainDB, chaindata).Flags(func(u uint) uint { return u &^ mdbx.Accede }).Exclusive()
db, err := openDB(cfg, true, logger)
Expand Down
107 changes: 76 additions & 31 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
withLocalityIndex: false, withExistenceIndex: false, compression: CompressNone, historyLargeValues: false,
},
restrictSubsetFileDeletions: a.commitmentValuesTransform,
compress: CompressNone,
compress: CompressKeys,
}
if a.d[kv.StorageDomain], err = NewDomain(cfg, aggregationStep, kv.StorageDomain, kv.TblStorageVals, kv.TblStorageHistoryKeys, kv.TblStorageHistoryVals, kv.TblStorageIdx, integrityCheck, logger); err != nil {
return nil, err
Expand Down Expand Up @@ -1490,7 +1490,7 @@ func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) RangesV3 {

// SqueezeCommitmentFiles should be called only when NO EXECUTION is running.
// Removes commitment files and suppose following aggregator shutdown and restart (to integrate new files and rebuild indexes)
func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error {
func (ac *AggregatorRoTx) SqueezeCommitmentFiles(mergedAgg *AggregatorRoTx) error {
if !ac.a.commitmentValuesTransform {
return nil
}
Expand All @@ -1500,9 +1500,50 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error {
storage := ac.d[kv.StorageDomain]

// oh, again accessing domain.files directly, again and again..
accountFiles := accounts.d.dirtyFiles.Items()
storageFiles := storage.d.dirtyFiles.Items()
commitFiles := commitment.d.dirtyFiles.Items()
mergedAccountFiles := mergedAgg.d[kv.AccountsDomain].d.dirtyFiles.Items()
mergedStorageFiles := mergedAgg.d[kv.StorageDomain].d.dirtyFiles.Items()
mergedCommitFiles := mergedAgg.d[kv.CommitmentDomain].d.dirtyFiles.Items()

for _, f := range accounts.files {
f.src.decompressor.EnableMadvNormal()
}
for _, f := range mergedAccountFiles {
f.decompressor.EnableMadvNormal()
}
for _, f := range storage.files {
f.src.decompressor.EnableMadvNormal()
}
for _, f := range mergedStorageFiles {
f.decompressor.EnableMadvNormal()
}
for _, f := range commitment.files {
f.src.decompressor.EnableMadvNormal()
}
for _, f := range mergedCommitFiles {
f.decompressor.EnableMadvNormal()
}
defer func() {
for _, f := range accounts.files {
f.src.decompressor.DisableReadAhead()
}
for _, f := range mergedAccountFiles {
f.decompressor.DisableReadAhead()
}
for _, f := range storage.files {
f.src.decompressor.DisableReadAhead()
}
for _, f := range mergedStorageFiles {
f.decompressor.DisableReadAhead()
}
for _, f := range commitment.files {
f.src.decompressor.DisableReadAhead()
}
for _, f := range mergedCommitFiles {
f.decompressor.DisableReadAhead()
}
}()

log.Info("[sqeeze_migration] see target files", "acc", len(mergedAccountFiles), "st", len(mergedStorageFiles), "com", len(mergedCommitFiles))

getSizeDelta := func(a, b string) (datasize.ByteSize, float32, error) {
ai, err := os.Stat(a)
Expand All @@ -1527,42 +1568,45 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error {
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()

for ci := 0; ci < len(commitFiles); ci++ {
cf := commitFiles[ci]
for ai = 0; ai < len(accountFiles); ai++ {
if accountFiles[ai].startTxNum == cf.startTxNum && accountFiles[ai].endTxNum == cf.endTxNum {
for ci := 0; ci < len(mergedCommitFiles); ci++ {
cf := mergedCommitFiles[ci]
for ai = 0; ai < len(mergedAccountFiles); ai++ {
if mergedAccountFiles[ai].startTxNum == cf.startTxNum && mergedAccountFiles[ai].endTxNum == cf.endTxNum {
break
}
}
for si = 0; si < len(storageFiles); si++ {
if storageFiles[si].startTxNum == cf.startTxNum && storageFiles[si].endTxNum == cf.endTxNum {
for si = 0; si < len(mergedStorageFiles); si++ {
if mergedStorageFiles[si].startTxNum == cf.startTxNum && mergedStorageFiles[si].endTxNum == cf.endTxNum {
break
}
}
if ai == len(accountFiles) || si == len(storageFiles) {
ac.a.logger.Info("SqueezeCommitmentFiles: commitment file has no corresponding account or storage file", "commitment", cf.decompressor.FileName())
if ai == len(mergedAccountFiles) || si == len(mergedStorageFiles) {
ac.a.logger.Info("[sqeeze_migration] commitment file has no corresponding account or storage file", "commitment", cf.decompressor.FileName())
continue
}
af, sf := accountFiles[ai], storageFiles[si]

err := func() error {
ac.a.logger.Info("SqueezeCommitmentFiles: file start", "original", cf.decompressor.FileName(),
"progress", fmt.Sprintf("%d/%d", ci+1, len(accountFiles)))
af, sf := mergedAccountFiles[ai], mergedStorageFiles[si]

steps := cf.endTxNum/ac.a.aggregationStep - cf.startTxNum/ac.a.aggregationStep
compression := commitment.d.compression
if steps < DomainMinStepsToCompress {
compression = CompressNone
}
ac.a.logger.Info("[sqeeze_migration] file start", "original", cf.decompressor.FileName(),
"progress", fmt.Sprintf("%d/%d", ci+1, len(mergedAccountFiles)), "compress_cfg", commitment.d.compressCfg, "compress", compression)

originalPath := cf.decompressor.FilePath()
squeezedTmpPath := originalPath + sqExt + ".tmp"

squeezedCompr, err := seg.NewCompressor(context.Background(), "squeeze", squeezedTmpPath, ac.a.dirs.Tmp,
commitment.d.compressCfg, log.LvlTrace, commitment.d.logger)

commitment.d.compressCfg, log.LvlInfo, commitment.d.logger)
if err != nil {
return err
}
defer squeezedCompr.Close()

cf.decompressor.EnableReadAhead()
defer cf.decompressor.DisableReadAhead()
reader := NewArchiveGetter(cf.decompressor.MakeGetter(), commitment.d.compression)
reader := NewArchiveGetter(cf.decompressor.MakeGetter(), compression)
reader.Reset(0)

writer := NewArchiveWriter(squeezedCompr, commitment.d.compression)
Expand All @@ -1573,9 +1617,10 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error {
}

i := 0
var k, v []byte
for reader.HasNext() {
k, _ := reader.Next(nil)
v, _ := reader.Next(nil)
k, _ = reader.Next(k[:0])
v, _ = reader.Next(v[:0])
i += 2

if k == nil {
Expand All @@ -1598,7 +1643,7 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error {

select {
case <-logEvery.C:
ac.a.logger.Info("SqueezeCommitmentFiles", "file", cf.decompressor.FileName(), "k", fmt.Sprintf("%x", k),
ac.a.logger.Info("[sqeeze_migration]", "file", cf.decompressor.FileName(), "k", fmt.Sprintf("%x", k),
"progress", fmt.Sprintf("%d/%d", i, cf.decompressor.Count()))
default:
}
Expand All @@ -1621,7 +1666,7 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error {
}
sizeDelta += delta

ac.a.logger.Info("SqueezeCommitmentFiles: file done", "original", filepath.Base(originalPath),
ac.a.logger.Info("[sqeeze_migration] file done", "original", filepath.Base(originalPath),
"sizeDelta", fmt.Sprintf("%s (%.1f%%)", delta.HR(), deltaP))

fromStep, toStep := af.startTxNum/ac.a.StepSize(), af.endTxNum/ac.a.StepSize()
Expand All @@ -1641,23 +1686,23 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error {
}
}

ac.a.logger.Info("SqueezeCommitmentFiles: squeezed files has been produced, removing obsolete files",
"toRemove", len(obsoleteFiles), "processed", fmt.Sprintf("%d/%d", processedFiles, len(commitFiles)))
ac.a.logger.Info("[sqeeze_migration] squeezed files has been produced, removing obsolete files",
"toRemove", len(obsoleteFiles), "processed", fmt.Sprintf("%d/%d", processedFiles, len(mergedCommitFiles)))
for _, path := range obsoleteFiles {
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
ac.a.logger.Debug("SqueezeCommitmentFiles: obsolete file removal", "path", path)
ac.a.logger.Debug("[sqeeze_migration] obsolete file removal", "path", path)
}
ac.a.logger.Info("SqueezeCommitmentFiles: indices removed, renaming temporal files ")
ac.a.logger.Info("[sqeeze_migration] indices removed, renaming temporal files ")

for _, path := range temporalFiles {
if err := os.Rename(path, strings.TrimSuffix(path, sqExt)); err != nil {
return err
}
ac.a.logger.Debug("SqueezeCommitmentFiles: temporal file renaming", "path", path)
ac.a.logger.Debug("[sqeeze_migration] temporal file renaming", "path", path)
}
ac.a.logger.Info("SqueezeCommitmentFiles: done", "sizeDelta", sizeDelta.HR(), "files", len(accountFiles))
ac.a.logger.Info("[sqeeze_migration] done", "sizeDelta", sizeDelta.HR(), "files", len(mergedAccountFiles))

return nil
}
Expand Down
1 change: 1 addition & 0 deletions erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestAggregatorV3_Merge(t *testing.T) {
rwTx.Rollback()
}
}()

ac := agg.BeginFilesRo()
defer ac.Close()
domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New())
Expand Down
15 changes: 15 additions & 0 deletions erigon-lib/state/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ func ParseFileCompression(s string) (FileCompression, error) {
}
}

func (c FileCompression) String() string {
switch c {
case CompressNone:
return "none"
case CompressKeys:
return "k"
case CompressVals:
return "v"
case CompressKeys | CompressVals:
return "kv"
default:
return ""
}
}

type getter struct {
*seg.Getter
nextValue bool // if nextValue true then getter.Next() expected to return value
Expand Down
28 changes: 12 additions & 16 deletions erigon-lib/state/bps_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/erigontech/erigon-lib/common/dbg"
"io"
"time"
"unsafe"

"github.com/c2h5oh/datasize"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/dbg"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon-lib/recsplit/eliasfano32"
)
Expand All @@ -49,7 +49,7 @@ type indexSeekerIterator interface {
}

type dataLookupFunc func(di uint64, g ArchiveGetter) ([]byte, []byte, error)
type keyCmpFunc func(k []byte, di uint64, g ArchiveGetter) (int, []byte, error)
type keyCmpFunc func(k []byte, di uint64, g ArchiveGetter, copyBuf []byte) (int, []byte, error)

// M limits amount of child for tree node.
func NewBpsTree(kv ArchiveGetter, offt *eliasfano32.EliasFano, M uint64, dataLookup dataLookupFunc, keyCmp keyCmpFunc) *BpsTree {
Expand All @@ -66,12 +66,11 @@ var envAssertBTKeys = dbg.EnvBool("BT_ASSERT_OFFSETS", false)
func NewBpsTreeWithNodes(kv ArchiveGetter, offt *eliasfano32.EliasFano, M uint64, dataLookup dataLookupFunc, keyCmp keyCmpFunc, nodes []Node) *BpsTree {
bt := &BpsTree{M: M, offt: offt, dataLookupFunc: dataLookup, keyCmpFunc: keyCmp, mx: nodes}

t := time.Now()
nsz := uint64(unsafe.Sizeof(Node{}))
var cachedBytes uint64
for i := 0; i < len(nodes); i++ {
if envAssertBTKeys {
eq, r, err := keyCmp(nodes[i].key, nodes[i].di, kv)
eq, r, err := keyCmp(nodes[i].key, nodes[i].di, kv, nil)
if err != nil {
panic(err)
}
Expand All @@ -83,11 +82,6 @@ func NewBpsTreeWithNodes(kv ArchiveGetter, offt *eliasfano32.EliasFano, M uint64
nodes[i].off = offt.Get(nodes[i].di)
}

N := offt.Count()
log.Root().Debug("BtIndex opened", "file", kv.FileName(), "M", bt.M, "N", common.PrettyCounter(N),
"cached", fmt.Sprintf("%s %.2f%%", common.PrettyCounter(len(bt.mx)), 100*(float64(len(bt.mx))/float64(N))),
"cacheSize", datasize.ByteSize(cachedBytes).HR(), "fileSize", datasize.ByteSize(kv.Size()).HR(),
"took", time.Since(t))
return bt
}

Expand Down Expand Up @@ -219,7 +213,7 @@ func (n *Node) Decode(buf []byte) (uint64, error) {
return uint64(10 + l), nil
}

func (b *BpsTree) WarmUp(kv ArchiveGetter) error {
func (b *BpsTree) WarmUp(kv ArchiveGetter) (err error) {
t := time.Now()
N := b.offt.Count()
if N == 0 {
Expand All @@ -238,9 +232,10 @@ func (b *BpsTree) WarmUp(kv ArchiveGetter) error {
// extremely stupid picking of needed nodes:
cachedBytes := uint64(0)
nsz := uint64(unsafe.Sizeof(Node{}))
var key []byte
for i := step; i < N; i += step {
di := i - 1
_, key, err := b.keyCmpFunc(nil, di, kv)
_, key, err = b.keyCmpFunc(nil, di, kv, key[:0])
if err != nil {
return err
}
Expand Down Expand Up @@ -308,7 +303,7 @@ func (b *BpsTree) Seek(g ArchiveGetter, seekKey []byte) (key, value []byte, di u
var cmp int
for l < r {
if r-l <= DefaultBtreeStartSkip { // found small range, faster to scan now
cmp, key, err = b.keyCmpFunc(seekKey, l, g)
cmp, key, err = b.keyCmpFunc(seekKey, l, g, key[:0])
if err != nil {
return nil, nil, 0, false, err
}
Expand All @@ -330,7 +325,7 @@ func (b *BpsTree) Seek(g ArchiveGetter, seekKey []byte) (key, value []byte, di u
}

m = (l + r) >> 1
cmp, key, err = b.keyCmpFunc(seekKey, m, g)
cmp, key, err = b.keyCmpFunc(seekKey, m, g, key[:0])
if err != nil {
return nil, nil, 0, false, err
}
Expand Down Expand Up @@ -362,7 +357,7 @@ func (b *BpsTree) Seek(g ArchiveGetter, seekKey []byte) (key, value []byte, di u
// returns first key which is >= key.
// If key is nil, returns first key
// if key is greater than all keys, returns nil
func (b *BpsTree) Get(g ArchiveGetter, key []byte) ([]byte, bool, uint64, error) {
func (b *BpsTree) Get(g ArchiveGetter, key []byte) (k []byte, ok bool, i uint64, err error) {
if b.trace {
fmt.Printf("get %x\n", key)
}
Expand All @@ -379,10 +374,11 @@ func (b *BpsTree) Get(g ArchiveGetter, key []byte) ([]byte, bool, uint64, error)
defer func() { fmt.Printf("found %x [%d %d]\n", key, l, r) }()
}

var cmp int
var m uint64
for l < r {
m = (l + r) >> 1
cmp, k, err := b.keyCmpFunc(key, m, g)
cmp, k, err = b.keyCmpFunc(key, m, g, k[:0])
if err != nil {
return nil, false, 0, err
}
Expand All @@ -400,7 +396,7 @@ func (b *BpsTree) Get(g ArchiveGetter, key []byte) ([]byte, bool, uint64, error)
}
}

cmp, k, err := b.keyCmpFunc(key, l, g)
cmp, k, err = b.keyCmpFunc(key, l, g, k[:0])
if err != nil || cmp != 0 {
return nil, false, 0, err
}
Expand Down
Loading

0 comments on commit 7df0561

Please sign in to comment.