From f566aa780cfff7db33a2993a6c0e0167c33b72ef Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 25 Mar 2022 16:50:43 -0400 Subject: [PATCH] % Checkpoint - Added the feature to write to File if writing to DB errors out. NOT TESTED --- statediff/indexer/constructor.go | 23 +++++--- statediff/indexer/database/dump/indexer.go | 4 +- statediff/indexer/database/file/indexer.go | 16 +++++- statediff/indexer/database/file/writer.go | 17 ++++++ statediff/indexer/database/sql/indexer.go | 9 ++-- .../database/sql/indexer_shared_test.go | 20 ++++++- .../indexer/database/sql/postgres/database.go | 4 +- statediff/indexer/interfaces/interfaces.go | 6 ++- statediff/service.go | 38 +++++++++---- statediff/service_public_test.go | 53 ++++++++++++++----- statediff/statediffing_test_file.sql | 0 11 files changed, 148 insertions(+), 42 deletions(-) create mode 100644 statediff/statediffing_test_file.sql diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go index 9a66dba8966d..3bcf0859b01a 100644 --- a/statediff/indexer/constructor.go +++ b/statediff/indexer/constructor.go @@ -31,11 +31,18 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) -// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface -func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) { - switch config.Type() { +// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface. +// You can specify the specific +func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config, specificIndexer shared.DBType) (interfaces.StateDiffIndexer, error) { + var indexerToCreate shared.DBType + if specificIndexer != "" { + indexerToCreate = config.Type() + } else { + indexerToCreate = specificIndexer + } + switch indexerToCreate { case shared.FILE: - log.Info("Starting statediff service in SQL file writing mode") + log.Info("Creating a statediff indexer in SQL file writing mode") fc, ok := config.(file.Config) if !ok { return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{}) @@ -43,7 +50,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n fc.NodeInfo = nodeInfo return file.NewStateDiffIndexer(ctx, chainConfig, fc) case shared.POSTGRES: - log.Info("Starting statediff service in Postgres writing mode") + log.Info("Creating a statediff service in Postgres writing mode") pgc, ok := config.(postgres.Config) if !ok { return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{}) @@ -62,17 +69,17 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n return nil, err } default: - return nil, fmt.Errorf("unrecongized Postgres driver type: %s", pgc.Driver) + return nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver) } return sql.NewStateDiffIndexer(ctx, chainConfig, postgres.NewPostgresDB(driver)) case shared.DUMP: - log.Info("Starting statediff service in data dump mode") + log.Info("Creating statediff indexer in data dump mode") dumpc, ok := config.(dump.Config) if !ok { return nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{}) } return dump.NewStateDiffIndexer(chainConfig, dumpc), nil default: - return nil, fmt.Errorf("unrecognized database type: %s", config.Type()) + return nil, fmt.Errorf("unrecognized database type: %s", indexerToCreate) } } diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 6689621a898a..f391b34e204e 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -497,9 +497,9 @@ func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() } -func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { +func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, index interfaces.StateDiffIndexer) error { return nil } -func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { +func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error { return nil } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 0bcd864d11ea..d58291825ea0 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -479,10 +479,22 @@ func (sdi *StateDiffIndexer) Close() error { return sdi.fileWriter.Close() } -func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { +func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer interfaces.StateDiffIndexer) error { return nil } -func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { +func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer interfaces.StateDiffIndexer) error { + log.Info("Writing Gaps to file") + if startingBlockNumber.Cmp(endingBlockNumber) != -1 { + return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber) + } + knownGap := models.KnownGapsModel{ + StartingBlockNumber: startingBlockNumber.String(), + EndingBlockNumber: endingBlockNumber.String(), + CheckedOut: checkedOut, + ProcessingKey: processingKey, + } + + sdi.fileWriter.upsertKnownGaps(knownGap) return nil } diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go index 48de0853dce4..a8795946f2b6 100644 --- a/statediff/indexer/database/file/writer.go +++ b/statediff/indexer/database/file/writer.go @@ -155,6 +155,14 @@ const ( storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " + "node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n" + // INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) + //VALUES ($1, $2, $3, $4) + // ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4) + // WHERE eth.known_gaps.ending_block_number <= $2 + knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " + + "VALUES ('%s', '%s', %t, %d) " + + "ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " + + "WHERE eth.known_gaps.ending_block_number <= '%s';\n" ) func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { @@ -253,3 +261,12 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)) } + +func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) { + sqw.stmts <- []byte(fmt.Sprintf(knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey, + knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber)) + //knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " + + // "VALUES ('%s', '%s', %t, %d) " + + // "ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " + + // "WHERE eth.known_gaps.ending_block_number <= '%s';\n" +} diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index c8a0e64c75ac..0ec9d75aa607 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -555,7 +555,7 @@ func (sdi *StateDiffIndexer) Close() error { } // Update the known gaps table with the gap information. -func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { +func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error { if startingBlockNumber.Cmp(endingBlockNumber) != -1 { return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber) } @@ -565,7 +565,10 @@ func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingB CheckedOut: checkedOut, ProcessingKey: processingKey, } + log.Info("Writing known gaps to the DB") if err := sdi.dbWriter.upsertKnownGaps(knownGap); err != nil { + log.Warn("Error writing knownGaps to DB, writing them to file instead") + fileIndexer.PushKnownGaps(startingBlockNumber, endingBlockNumber, checkedOut, processingKey, nil) return err } return nil @@ -617,7 +620,7 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer // TODO: // REmove the return value // Write to file if err in writing to DB -func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { +func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error { dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids" latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString) if err != nil { @@ -632,7 +635,7 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe endBlock.Sub(latestBlockOnChain, expectedDifference) log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock)) - err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey) + err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer) if err != nil { // Write to file SQL file instead!!! // If write to SQL file fails, write to disk. Handle this within the write to SQL file function! diff --git a/statediff/indexer/database/sql/indexer_shared_test.go b/statediff/indexer/database/sql/indexer_shared_test.go index 3af3adff9c98..e7c7e2f9f102 100644 --- a/statediff/indexer/database/sql/indexer_shared_test.go +++ b/statediff/indexer/database/sql/indexer_shared_test.go @@ -3,18 +3,21 @@ package sql_test import ( "bytes" "context" + "errors" "fmt" "math/big" "os" "testing" "github.com/ipfs/go-cid" + "github.com/jmoiron/sqlx" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/database/file" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" @@ -24,6 +27,7 @@ import ( var ( db sql.Database + sqlxdb *sqlx.DB err error ind interfaces.StateDiffIndexer chainConf = params.MainnetChainConfig @@ -162,6 +166,16 @@ func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) { return stateDiff, err } +func setupFile(t *testing.T) interfaces.StateDiffIndexer { + if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) { + err := os.Remove(file.TestConfig.FilePath) + require.NoError(t, err) + } + ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig) + require.NoError(t, err) + return ind +} + func tearDown(t *testing.T) { sql.TearDownDB(t, db) err := ind.Close() @@ -180,6 +194,8 @@ func testKnownGapsUpsert(t *testing.T) { t.Fatal(err) } + fileInd := setupFile(t) + // Get the latest block from the DB latestBlockInDb, err := stateDiff.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids") if err != nil { @@ -193,7 +209,7 @@ func testKnownGapsUpsert(t *testing.T) { t.Log("The latest block on the chain is: ", latestBlockOnChain) t.Log("The latest block on the DB is: ", latestBlockInDb) - gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0) + gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0, fileInd) require.NoError(t, gapUpsertErr) // Calculate what the start and end block should be in known_gaps @@ -206,7 +222,7 @@ func testKnownGapsUpsert(t *testing.T) { queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlock, endBlock) _, queryErr := stateDiff.QueryDb(queryString) // Figure out the string. - t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock) require.NoError(t, queryErr) + t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock) } diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 33bfa97576e2..1564ad8b51ea 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -103,6 +103,8 @@ func (db *DB) InsertIPLDsStm() string { // InsertKnownGapsStm satisfies the sql.Statements interface func (db *DB) InsertKnownGapsStm() string { - return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4) ON CONFLICT (starting_block_number) DO NOTHING` + return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4) + ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4) + WHERE eth.known_gaps.ending_block_number <= $2` //return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES (1, 2, true, 1)` } diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 36d3eb32bf54..09778be5052d 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -32,8 +32,10 @@ type StateDiffIndexer interface { PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error ReportDBMetrics(delay time.Duration, quit <-chan bool) - FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error - PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error + FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer StateDiffIndexer) error + // The indexer at the end allows us to pass one indexer to another. + // We use then for the SQL indexer, we pass it the file indexer so it can write to file if writing to the DB fails. + PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer StateDiffIndexer) error io.Closer } diff --git a/statediff/service.go b/statediff/service.go index 7f3c6dda4e76..df6f2066b18c 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -44,6 +44,7 @@ import ( ind "github.com/ethereum/go-ethereum/statediff/indexer" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" types2 "github.com/ethereum/go-ethereum/statediff/types" "github.com/ethereum/go-ethereum/trie" ) @@ -125,7 +126,7 @@ type Service struct { // Should the statediff service wait for geth to sync to head? WaitForSync bool // Used to signal if we should check for KnownGaps - KnownGaps KnownGaps + KnownGaps KnownGapsState // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects @@ -138,7 +139,8 @@ type Service struct { maxRetry uint } -type KnownGaps struct { +// This structure keeps track of the knownGaps at any given moment in time +type KnownGapsState struct { // Should we check for gaps by looking at the DB and comparing the latest block with head checkForGaps bool // Arbitrary processingKey that can be used down the line to differentiate different geth nodes. @@ -157,6 +159,9 @@ type KnownGaps struct { // The last processed block keeps track of the last processed block. // Its used to make sure we didn't skip over any block! lastProcessedBlock *big.Int + // This fileIndexer is used to write the knownGaps to file + // If we can't properly write to DB + fileIndexer interfaces.StateDiffIndexer } // This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks. @@ -192,7 +197,7 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference) // 121 to 124 log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock)) - sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey) + sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) } if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 { @@ -204,12 +209,12 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference) // 111 to 114 log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock)) - sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey) + sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) } log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks)) log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey)) - sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey) + sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) } else { log.Warn("We missed blocks without any errors.") @@ -219,7 +224,7 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference) log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock)) log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock)) - sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey) + sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) } } @@ -242,6 +247,7 @@ func NewBlockCache(max uint) BlockCache { func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error { blockChain := ethServ.BlockChain() var indexer interfaces.StateDiffIndexer + var fileIndexer interfaces.StateDiffIndexer quitCh := make(chan bool) if params.IndexerConfig != nil { info := nodeinfo.Info{ @@ -252,23 +258,37 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params ClientName: params.ClientName, } var err error - indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig) + indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "") if err != nil { return err } + if params.IndexerConfig.Type() != shared.FILE { + fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "") + log.Info("Starting the statediff service in ", "mode", params.IndexerConfig.Type()) + if err != nil { + return err + } + + } else { + log.Info("Starting the statediff service in ", "mode", "File") + fileIndexer = indexer + } + //fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info) indexer.ReportDBMetrics(10*time.Second, quitCh) } + workers := params.NumWorkers if workers == 0 { workers = 1 } // If we ever have multiple processingKeys we can update them here // along with the expectedDifference - knownGaps := &KnownGaps{ + knownGaps := &KnownGapsState{ checkForGaps: true, processingKey: 0, expectedDifference: big.NewInt(1), errorState: false, + fileIndexer: fileIndexer, } sds := &Service{ Mutex: sync.Mutex{}, @@ -409,7 +429,7 @@ func (sds *Service) writeLoopWorker(params workerParams) { // Check and update the gaps table. if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState { log.Info("Checking for Gaps at current block: ", currentBlock.Number()) - go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey) + go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) sds.KnownGaps.checkForGaps = false } diff --git a/statediff/service_public_test.go b/statediff/service_public_test.go index 05e7cb52eff6..c4d8ba5b763f 100644 --- a/statediff/service_public_test.go +++ b/statediff/service_public_test.go @@ -2,14 +2,18 @@ package statediff import ( "context" + "errors" "fmt" "math/big" + "os" "testing" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/statediff/indexer/database/file" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/statediff/indexer/mocks" "github.com/stretchr/testify/require" ) @@ -20,6 +24,9 @@ var ( chainConf = params.MainnetChainConfig ) +// Add clean db +// Test for failures when they are expected, when we go from smaller block to larger block +// We should no longer see the smaller block in DB func TestKnownGaps(t *testing.T) { type gapValues struct { lastProcessedBlock int64 @@ -27,40 +34,48 @@ func TestKnownGaps(t *testing.T) { knownErrorBlocksStart int64 knownErrorBlocksEnd int64 expectedDif int64 + processingKey int64 } tests := []gapValues{ // Unprocessed gaps before and after knownErrorBlock - {lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1}, + {lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1, processingKey: 1}, // No knownErrorBlocks - {lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1}, + {lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1, processingKey: 1}, // No gaps before or after knownErrorBlocks - {lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1}, + {lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1, processingKey: 1}, // gaps before knownErrorBlocks but not after - {lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1}, + {lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1, processingKey: 1}, // gaps after knownErrorBlocks but not before - {lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1}, + {lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1, processingKey: 1}, /// Same tests as above with a new expected DIF // Unprocessed gaps before and after knownErrorBlock - {lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2}, + {lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2, processingKey: 2}, // No knownErrorBlocks - {lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2}, + {lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2, processingKey: 2}, // No gaps before or after knownErrorBlocks - {lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2}, + {lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2, processingKey: 2}, // gaps before knownErrorBlocks but not after - {lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2}, + {lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2, processingKey: 2}, // gaps after knownErrorBlocks but not before - {lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2}, + {lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2, processingKey: 2}, + // Test update when block number is larger!! + {lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 2}, + {lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 10}, } for _, tc := range tests { // Reuse processing key from expecteDiff - testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif) + testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, false) + } + for _, tc := range tests { + // Reuse processing key from expecteDiff + testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, true) } } // It also makes sure we properly calculate any missed gaps not in the known gaps lists // either before or after the list. -func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64) { +func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64, skipDb bool) { lastProcessedBlock := big.NewInt(lastBlockProcessed) currentBlock := big.NewInt(currentBlockNum) @@ -74,9 +89,11 @@ func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBloc } // Comment out values which should not be used - knownGaps := KnownGaps{ + fileInd := setupFile(t) + knownGaps := KnownGapsState{ processingKey: processingKey, expectedDifference: big.NewInt(expectedDif), + fileIndexer: fileInd, } stateDiff, err := setupDb(t) if err != nil { @@ -134,6 +151,16 @@ func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) { return stateDiff, err } +func setupFile(t *testing.T) interfaces.StateDiffIndexer { + if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) { + err := os.Remove(file.TestConfig.FilePath) + require.NoError(t, err) + } + indexer, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig) + require.NoError(t, err) + return indexer +} + // Teardown the DB func tearDown(t *testing.T, stateDiff *sql.StateDiffIndexer) { t.Log("Starting tearDown") diff --git a/statediff/statediffing_test_file.sql b/statediff/statediffing_test_file.sql new file mode 100644 index 000000000000..e69de29bb2d1