Skip to content

Commit

Permalink
polygon/sync: initialise canonical chain builder correctly (#12246)
Browse files Browse the repository at this point in the history
This PR captures several todos on my list of improving things related to
Milestone hash mismatches that I noticed while testing Astrid on the
tip:
- after restarts, when we initialise the canonical chain builder, we
need to set the root to the last finalised waypoint header and connect
the latest known tip to it. Previously, we set the root to the latest
known tip which may not be the finalised one. This caused unnecessary
unwinds on startup (with potentially incorrect unwind num for the
bridge.Unwind) and although we auto recover from it it is still better
to fix it properly as it may cause unexpected behaviour further down the
line
- harden the milestone verifier to check for correct header connectivity
via ParentHash and Number and also that the number of headers match with
the number of headers in the Milestone because the Milestone is simply
just the last header hash so a malicious peer may trick us easily
- revert #11929 which, after
taking a deeper look, seems to be an inaccurate interpretation of the
error (prune of TD happens only for the blocks T-150_000). I believe
this error may have been caused by something else, which may have been
fixed by now (after numerous fixes to Astrid related to the parent td
errors) - but if not it will be better to troubleshoot more thoroughly
if the error ever arises again. In addition, this change is dangerous as
it means we may falsely think we have all blocks for a given waypoint
inserted in the DB, however that may not be true on restarts in the
middle of waypoints, because the blocks inserted before the start may
not be the right ones that are part of the waypoint (due to the tip not
yet being finalised)
  • Loading branch information
taratorio authored Oct 9, 2024
1 parent e6a0caa commit ef5422a
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 111 deletions.
24 changes: 19 additions & 5 deletions polygon/heimdall/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package heimdall

import (
"context"
"errors"
"time"

"github.com/erigontech/erigon-lib/common/errors"
commonerrors "github.com/erigontech/erigon-lib/common/errors"
"github.com/erigontech/erigon-lib/common/generics"
"github.com/erigontech/erigon-lib/log/v3"

libcommon "github.com/erigontech/erigon-lib/common"
Expand Down Expand Up @@ -69,7 +71,7 @@ func (s *scraper[TEntity]) Run(ctx context.Context) error {

idRange, err := s.fetcher.FetchEntityIdRange(ctx)
if err != nil {
if errors.IsOneOf(err, s.transientErrors) {
if commonerrors.IsOneOf(err, s.transientErrors) {
s.logger.Warn(heimdallLogPrefix("scraper transient err occurred when fetching id range"), "err", err)
continue
}
Expand All @@ -90,7 +92,7 @@ func (s *scraper[TEntity]) Run(ctx context.Context) error {
} else {
entities, err := s.fetcher.FetchEntitiesRange(ctx, idRange)
if err != nil {
if errors.IsOneOf(err, s.transientErrors) {
if commonerrors.IsOneOf(err, s.transientErrors) {
// we do not break the scrapping loop when hitting a transient error
// we persist the partially fetched range entities before it occurred
// and continue scrapping again from there onwards
Expand Down Expand Up @@ -122,6 +124,18 @@ func (s *scraper[TEntity]) RegisterObserver(observer func([]TEntity)) polygoncom
return s.observers.Register(observer)
}

func (s *scraper[TEntity]) Synchronize(ctx context.Context) error {
return s.syncEvent.Wait(ctx)
func (s *scraper[TEntity]) Synchronize(ctx context.Context) (TEntity, error) {
if err := s.syncEvent.Wait(ctx); err != nil {
return generics.Zero[TEntity](), err
}

last, ok, err := s.store.LastEntity(ctx)
if err != nil {
return generics.Zero[TEntity](), err
}
if !ok {
return generics.Zero[TEntity](), errors.New("unexpected last entity not available")
}

return last, nil
}
10 changes: 5 additions & 5 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type Service interface {
Producers(ctx context.Context, blockNum uint64) (*valset.ValidatorSet, error)
RegisterMilestoneObserver(callback func(*Milestone), opts ...ObserverOption) polygoncommon.UnregisterFunc
Run(ctx context.Context) error
SynchronizeCheckpoints(ctx context.Context) error
SynchronizeMilestones(ctx context.Context) error
SynchronizeCheckpoints(ctx context.Context) (latest *Checkpoint, err error)
SynchronizeMilestones(ctx context.Context) (latest *Milestone, err error)
SynchronizeSpans(ctx context.Context, blockNum uint64) error
}

Expand Down Expand Up @@ -182,12 +182,12 @@ func (s *service) Span(ctx context.Context, id uint64) (*Span, bool, error) {
return s.reader.Span(ctx, id)
}

func (s *service) SynchronizeCheckpoints(ctx context.Context) error {
func (s *service) SynchronizeCheckpoints(ctx context.Context) (*Checkpoint, error) {
s.logger.Debug(heimdallLogPrefix("synchronizing checkpoints..."))
return s.checkpointScraper.Synchronize(ctx)
}

func (s *service) SynchronizeMilestones(ctx context.Context) error {
func (s *service) SynchronizeMilestones(ctx context.Context) (*Milestone, error) {
s.logger.Debug(heimdallLogPrefix("synchronizing milestones..."))
return s.milestoneScraper.Synchronize(ctx)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func (s *service) SynchronizeSpans(ctx context.Context, blockNum uint64) error {
}

func (s *service) synchronizeSpans(ctx context.Context) error {
if err := s.spanScraper.Synchronize(ctx); err != nil {
if _, err := s.spanScraper.Synchronize(ctx); err != nil {
return err
}

Expand Down
6 changes: 4 additions & 2 deletions polygon/heimdall/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,12 @@ func (suite *ServiceTestSuite) SetupSuite() {
return suite.service.Run(suite.ctx)
})

err = suite.service.SynchronizeMilestones(suite.ctx)
lastMilestone, err := suite.service.SynchronizeMilestones(suite.ctx)
require.NoError(suite.T(), err)
err = suite.service.SynchronizeCheckpoints(suite.ctx)
require.Equal(suite.T(), suite.expectedLastMilestone, uint64(lastMilestone.Id))
lastCheckpoint, err := suite.service.SynchronizeCheckpoints(suite.ctx)
require.NoError(suite.T(), err)
require.Equal(suite.T(), suite.expectedLastCheckpoint, uint64(lastCheckpoint.Id))
err = suite.service.SynchronizeSpans(suite.ctx, math.MaxInt)
require.NoError(suite.T(), err)
}
Expand Down
14 changes: 5 additions & 9 deletions polygon/sync/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (d *blockDownloader) DownloadBlocksUsingCheckpoints(ctx context.Context, st
return nil, err
}

return d.downloadBlocksUsingWaypoints(ctx, checkpoints.Waypoints(), d.checkpointVerifier, start)
return d.downloadBlocksUsingWaypoints(ctx, checkpoints.Waypoints(), d.checkpointVerifier)
}

func (d *blockDownloader) DownloadBlocksUsingMilestones(ctx context.Context, start uint64) (*types.Header, error) {
Expand Down Expand Up @@ -147,14 +147,13 @@ func (d *blockDownloader) DownloadBlocksUsingMilestones(ctx context.Context, sta
milestones[0].Fields.StartBlock = new(big.Int).SetUint64(start)
}

return d.downloadBlocksUsingWaypoints(ctx, milestones.Waypoints(), d.milestoneVerifier, start)
return d.downloadBlocksUsingWaypoints(ctx, milestones.Waypoints(), d.milestoneVerifier)
}

func (d *blockDownloader) downloadBlocksUsingWaypoints(
ctx context.Context,
waypoints heimdall.Waypoints,
verifier WaypointHeadersVerifier,
startBlockNum uint64,
) (*types.Header, error) {
if len(waypoints) == 0 {
return nil, nil
Expand Down Expand Up @@ -289,12 +288,9 @@ func (d *blockDownloader) downloadBlocksUsingWaypoints(
break
}

batchStart := blockBatch[0].Number().Uint64()
batchEnd := blockBatch[len(blockBatch)-1].Number().Uint64()
if batchStart <= startBlockNum && startBlockNum <= batchEnd {
// we do not want to re-insert blocks of the first waypoint if the start block
// falls in the middle of the waypoint range
blockBatch = blockBatch[startBlockNum-batchStart:]
if blockBatch[0].Number().Uint64() == 0 {
// we do not want to insert block 0 (genesis)
blockBatch = blockBatch[1:]
}

blocks = append(blocks, blockBatch...)
Expand Down
38 changes: 2 additions & 36 deletions polygon/sync/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/log/v3"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/polygon/heimdall"
"github.com/erigontech/erigon/polygon/p2p"
Expand Down Expand Up @@ -368,41 +369,6 @@ func TestBlockDownloaderDownloadBlocksUsingCheckpoints(t *testing.T) {
require.Equal(t, blocks[len(blocks)-1].Header(), tip)
}

func TestBlockDownloaderDownloadBlocksUsingCheckpointsWhenStartIsInMiddleOfCheckpointRange(t *testing.T) {
test := newBlockDownloaderTest(t)
test.waypointReader.EXPECT().
CheckpointsFromBlock(gomock.Any(), gomock.Any()).
Return(test.fakeCheckpoints(2), nil).
Times(1)
test.p2pService.EXPECT().
ListPeersMayHaveBlockNum(gomock.Any()).
Return(test.fakePeers(2)).
Times(1)
test.p2pService.EXPECT().
FetchHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultFetchHeadersMock()).
Times(2)
test.p2pService.EXPECT().
FetchBodies(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultFetchBodiesMock()).
Times(2)
var blocks []*types.Block
test.store.EXPECT().
InsertBlocks(gomock.Any(), gomock.Any()).
DoAndReturn(test.defaultInsertBlocksMock(&blocks)).
Times(1)

tip, err := test.blockDownloader.DownloadBlocksUsingCheckpoints(context.Background(), 513)
require.NoError(t, err)
require.Len(t, blocks, 1536) // [513,1024] = 512 blocks + 1024 blocks from 2nd checkpoint
// check blocks are written in order
require.Equal(t, uint64(513), blocks[0].Header().Number.Uint64())
require.Equal(t, uint64(1024), blocks[511].Header().Number.Uint64())
require.Equal(t, uint64(1025), blocks[512].Header().Number.Uint64())
require.Equal(t, uint64(2048), blocks[1535].Header().Number.Uint64())
require.Equal(t, blocks[len(blocks)-1].Header(), tip)
}

func TestBlockDownloaderDownloadBlocksWhenInvalidHeadersThenPenalizePeerAndReDownload(t *testing.T) {
var firstTimeInvalidReturned bool
firstTimeInvalidReturnedPtr := &firstTimeInvalidReturned
Expand Down
Loading

0 comments on commit ef5422a

Please sign in to comment.