Skip to content

Commit

Permalink
blockfiles: split dirty files and visible files (#11557)
Browse files Browse the repository at this point in the history
This PR for  #11417 includes:
1. splitting segments into dirtySegments and visibleSegments
2. dirtySegments are updated in background and not accessible for APP.
3. dirtySegments will be added to visibleSegments when:
    - there's no gap/overlap/garbage 
    - all types of segments are created and indexed  at that height
4. add unit test:  `TestCalculateVisibleSegments`

---------

Co-authored-by: lupin012 <[email protected]>
Co-authored-by: Alex Sharov <[email protected]>
Co-authored-by: Ilya Mikheev <[email protected]>
Co-authored-by: JkLondon <[email protected]>
Co-authored-by: shashiy <[email protected]>
Co-authored-by: Elias Rad <[email protected]>
Co-authored-by: awskii <[email protected]>
Co-authored-by: blxdyx <[email protected]>
Co-authored-by: Giulio rebuffo <[email protected]>
Co-authored-by: Shota <[email protected]>
Co-authored-by: shota.silagadze <[email protected]>
Co-authored-by: Dmytro Vovk <[email protected]>
Co-authored-by: Massa <[email protected]>
  • Loading branch information
14 people authored Sep 11, 2024
1 parent 66dfabe commit 253f737
Show file tree
Hide file tree
Showing 18 changed files with 1,097 additions and 706 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,17 @@ func TestSimulatorEvents(t *testing.T) {
t.Skip("fix me on win")
}

// the number of events included in v1-000000-000500-borevents.seg
eventsCount := 23

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sim := setup(t, ctx, []uint64{1_000_000})

res, err := sim.FetchStateSyncEvents(ctx, 0, time.Now(), 100)
assert.NoError(t, err)
assert.Equal(t, 100, len(res))
assert.Equal(t, eventsCount, len(res))

resLimit, err := sim.FetchStateSyncEvents(ctx, 0, time.Now(), 2)
assert.NoError(t, err)
Expand All @@ -117,7 +120,7 @@ func TestSimulatorEvents(t *testing.T) {
lastTime := res[len(res)-1].Time
resTime, err := sim.FetchStateSyncEvents(ctx, 0, lastTime.Add(-1*time.Second), 100)
assert.NoError(t, err)
assert.Equal(t, 99, len(resTime))
assert.Equal(t, eventsCount-1, len(resTime))
assert.Equal(t, res[:len(res)-1], resTime)
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,8 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
openSnapshotOnce.Do(func() {
dirs := datadir.New(datadirCli)

snapCfg := ethconfig.NewSnapCfg(true, true, true)
chainConfig := fromdb.ChainConfig(db)
snapCfg := ethconfig.NewSnapCfg(true, true, true, chainConfig.ChainName)

_allSnapshotsSingleton = freezeblocks.NewRoSnapshots(snapCfg, dirs.Snap, 0, logger)
_allBorSnapshotsSingleton = freezeblocks.NewBorRoSnapshots(snapCfg, dirs.Snap, 0, logger)
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/direct/sentry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"

sentryproto "github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
"github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
types "github.com/erigontech/erigon-lib/gointerfaces/typesproto"
libsentry "github.com/erigontech/erigon-lib/p2p/sentry"
)
Expand Down
3 changes: 3 additions & 0 deletions erigon-lib/downloader/snaptype/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ func (f FileInfo) Name() string { return f.name }
func (f FileInfo) Dir() string { return filepath.Dir(f.Path) }
func (f FileInfo) Len() uint64 { return f.To - f.From }

func (f FileInfo) GetRange() (from, to uint64) { return f.From, f.To }
func (f FileInfo) GetType() Type { return f.Type }

func (f FileInfo) CompareTo(o FileInfo) int {
if res := cmp.Compare(f.From, o.From); res != 0 {
return res
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion erigon-lib/p2p/sentry/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/erigontech/erigon-lib/gointerfaces"
"github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
"github.com/erigontech/erigon-lib/gointerfaces/typesproto"
sentry "github.com/erigontech/erigon-lib/p2p/sentry"
"github.com/erigontech/erigon-lib/p2p/sentry"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
Expand Down
2 changes: 0 additions & 2 deletions erigon-lib/state/appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ func (ap *Appendable) BuildMissedAccessors(ctx context.Context, g *errgroup.Grou
}

func (ap *Appendable) openDirtyFiles() error {
fmt.Printf("[dbg] dirtyFiles.Len() %d\n", ap.dirtyFiles.Len())

var invalidFileItems []*filesItem
invalidFileItemsLock := sync.Mutex{}
ap.dirtyFiles.Walk(func(items []*filesItem) bool {
Expand Down
5 changes: 3 additions & 2 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ type BlocksFreezing struct {
NoDownloader bool // possible to use snapshots without calling Downloader
Verify bool // verify snapshots on startup
DownloaderAddr string
ChainName string
}

func (s BlocksFreezing) String() string {
Expand All @@ -161,8 +162,8 @@ var (
FlagSnapStateStop = "snap.state.stop"
)

func NewSnapCfg(keepBlocks, produceE2, produceE3 bool) BlocksFreezing {
return BlocksFreezing{KeepBlocks: keepBlocks, ProduceE2: produceE2, ProduceE3: produceE3}
func NewSnapCfg(keepBlocks, produceE2, produceE3 bool, chainName string) BlocksFreezing {
return BlocksFreezing{KeepBlocks: keepBlocks, ProduceE2: produceE2, ProduceE3: produceE3, ChainName: chainName}
}

// Config contains configuration options for ETH protocol.
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func pruneBlockSnapshots(ctx context.Context, cfg SnapshotsCfg, logger log.Logge
return false, err
}
defer tx.Rollback()
// Prune snapshots if necessary (remove .segs or idx files appropriatelly)
// Prune snapshots if necessary (remove .segs or idx files appropriately)
headNumber := cfg.blockReader.FrozenBlocks()
executionProgress, err := stages.GetStageProgress(tx, stages.Execution)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion p2p/sentry/simulator/sentry_simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *server) getHeaderByHash(ctx context.Context, hash common.Hash) (*corety
return s.blockReader.HeaderByHash(ctx, nil, hash)
}

func (s *server) downloadHeaders(ctx context.Context, header *freezeblocks.Segment) error {
func (s *server) downloadHeaders(ctx context.Context, header *freezeblocks.VisibleSegment) error {
fileName := snaptype.SegmentFileName(0, header.From(), header.To(), coresnaptype.Enums.Headers)
session := sync.NewTorrentSession(s.downloader, s.chain)

Expand Down
14 changes: 8 additions & 6 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ func doIntegrity(cliCtx *cli.Context) error {
chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer chainDB.Close()

cfg := ethconfig.NewSnapCfg(false, true, true)
chainConfig := fromdb.ChainConfig(chainDB)
cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName)
from := cliCtx.Uint64(SnapshotFromFlag.Name)

_, _, _, blockRetire, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger)
Expand Down Expand Up @@ -966,8 +967,8 @@ func doIndicesCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
return err
}

cfg := ethconfig.NewSnapCfg(false, true, true)
chainConfig := fromdb.ChainConfig(chainDB)
cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName)
from := cliCtx.Uint64(SnapshotFromFlag.Name)

_, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger)
Expand Down Expand Up @@ -999,7 +1000,9 @@ func doLS(cliCtx *cli.Context, dirs datadir.Dirs) error {

chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer chainDB.Close()
cfg := ethconfig.NewSnapCfg(false, true, true)

chainConfig := fromdb.ChainConfig(chainDB)
cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName)
from := cliCtx.Uint64(SnapshotFromFlag.Name)
blockSnaps, borSnaps, caplinSnaps, _, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger)
if err != nil {
Expand Down Expand Up @@ -1214,8 +1217,8 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer db.Close()

cfg := ethconfig.NewSnapCfg(false, true, true)

chainConfig := fromdb.ChainConfig(db)
cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName)
blockSnaps, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, db, logger)
if err != nil {
return err
Expand All @@ -1227,7 +1230,6 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
agg.SetMergeWorkers(estimate.AlmostAllCPUs())
agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())

chainConfig := fromdb.ChainConfig(db)
if err := br.BuildMissedIndicesIfNeed(ctx, "retire", nil, chainConfig); err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions turbo/snapshotsync/freezeblocks/beacon_block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *beaconSnapshotReader) ReadBlockBySlot(ctx context.Context, tx kv.Tx, sl
return nil, nil
}

idxSlot := seg.Index()
idxSlot := seg.src.Index()

if idxSlot == nil {
return nil, nil
Expand All @@ -109,7 +109,7 @@ func (r *beaconSnapshotReader) ReadBlockBySlot(ctx context.Context, tx kv.Tx, sl
}
blockOffset := idxSlot.OrdinalLookup(slot - idxSlot.BaseDataID())

gg := seg.MakeGetter()
gg := seg.src.MakeGetter()
gg.Reset(blockOffset)
if !gg.HasNext() {
return nil, nil
Expand Down Expand Up @@ -159,7 +159,7 @@ func (r *beaconSnapshotReader) ReadBlindedBlockBySlot(ctx context.Context, tx kv
return nil, nil
}

idxSlot := seg.Index()
idxSlot := seg.src.Index()

if idxSlot == nil {
return nil, nil
Expand All @@ -169,7 +169,7 @@ func (r *beaconSnapshotReader) ReadBlindedBlockBySlot(ctx context.Context, tx kv
}
blockOffset := idxSlot.OrdinalLookup(slot - idxSlot.BaseDataID())

gg := seg.MakeGetter()
gg := seg.src.MakeGetter()
gg.Reset(blockOffset)
if !gg.HasNext() {
return nil, nil
Expand Down Expand Up @@ -240,7 +240,7 @@ func (r *beaconSnapshotReader) ReadBlockByRoot(ctx context.Context, tx kv.Tx, ro
return nil, nil
}

idxSlot := seg.Index()
idxSlot := seg.src.Index()

if idxSlot == nil {
return nil, nil
Expand All @@ -250,7 +250,7 @@ func (r *beaconSnapshotReader) ReadBlockByRoot(ctx context.Context, tx kv.Tx, ro
}
blockOffset := idxSlot.OrdinalLookup(*slot - idxSlot.BaseDataID())

gg := seg.MakeGetter()
gg := seg.src.MakeGetter()
gg.Reset(blockOffset)
if !gg.HasNext() {
return nil, nil
Expand Down
Loading

0 comments on commit 253f737

Please sign in to comment.