Skip to content

Commit

Permalink
% Checkpoint - Added the feature to write to File if writing to DB er…
Browse files Browse the repository at this point in the history
…rors out. NOT TESTED
  • Loading branch information
Abdul Rabbani committed Mar 25, 2022
1 parent f6ff20e commit f566aa7
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 42 deletions.
23 changes: 15 additions & 8 deletions statediff/indexer/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,26 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)

// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) {
switch config.Type() {
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
// You can specify the specific
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config, specificIndexer shared.DBType) (interfaces.StateDiffIndexer, error) {
var indexerToCreate shared.DBType
if specificIndexer != "" {
indexerToCreate = config.Type()
} else {
indexerToCreate = specificIndexer
}
switch indexerToCreate {
case shared.FILE:
log.Info("Starting statediff service in SQL file writing mode")
log.Info("Creating a statediff indexer in SQL file writing mode")
fc, ok := config.(file.Config)
if !ok {
return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
}
fc.NodeInfo = nodeInfo
return file.NewStateDiffIndexer(ctx, chainConfig, fc)
case shared.POSTGRES:
log.Info("Starting statediff service in Postgres writing mode")
log.Info("Creating a statediff service in Postgres writing mode")
pgc, ok := config.(postgres.Config)
if !ok {
return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
Expand All @@ -62,17 +69,17 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
return nil, err
}
default:
return nil, fmt.Errorf("unrecongized Postgres driver type: %s", pgc.Driver)
return nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
}
return sql.NewStateDiffIndexer(ctx, chainConfig, postgres.NewPostgresDB(driver))
case shared.DUMP:
log.Info("Starting statediff service in data dump mode")
log.Info("Creating statediff indexer in data dump mode")
dumpc, ok := config.(dump.Config)
if !ok {
return nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{})
}
return dump.NewStateDiffIndexer(chainConfig, dumpc), nil
default:
return nil, fmt.Errorf("unrecognized database type: %s", config.Type())
return nil, fmt.Errorf("unrecognized database type: %s", indexerToCreate)
}
}
4 changes: 2 additions & 2 deletions statediff/indexer/database/dump/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,9 @@ func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close()
}

func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, index interfaces.StateDiffIndexer) error {
return nil
}
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error {
return nil
}
16 changes: 14 additions & 2 deletions statediff/indexer/database/file/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,22 @@ func (sdi *StateDiffIndexer) Close() error {
return sdi.fileWriter.Close()
}

func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer interfaces.StateDiffIndexer) error {
return nil
}

func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer interfaces.StateDiffIndexer) error {
log.Info("Writing Gaps to file")
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
}
knownGap := models.KnownGapsModel{
StartingBlockNumber: startingBlockNumber.String(),
EndingBlockNumber: endingBlockNumber.String(),
CheckedOut: checkedOut,
ProcessingKey: processingKey,
}

sdi.fileWriter.upsertKnownGaps(knownGap)
return nil
}
17 changes: 17 additions & 0 deletions statediff/indexer/database/file/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ const (

storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " +
"node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
// INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key)
//VALUES ($1, $2, $3, $4)
// ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
// WHERE eth.known_gaps.ending_block_number <= $2
knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
"VALUES ('%s', '%s', %t, %d) " +
"ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
"WHERE eth.known_gaps.ending_block_number <= '%s';\n"
)

func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
Expand Down Expand Up @@ -253,3 +261,12 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
}

func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) {
sqw.stmts <- []byte(fmt.Sprintf(knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
//knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
// "VALUES ('%s', '%s', %t, %d) " +
// "ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
// "WHERE eth.known_gaps.ending_block_number <= '%s';\n"
}
9 changes: 6 additions & 3 deletions statediff/indexer/database/sql/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (sdi *StateDiffIndexer) Close() error {
}

// Update the known gaps table with the gap information.
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error {
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
}
Expand All @@ -565,7 +565,10 @@ func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingB
CheckedOut: checkedOut,
ProcessingKey: processingKey,
}
log.Info("Writing known gaps to the DB")
if err := sdi.dbWriter.upsertKnownGaps(knownGap); err != nil {
log.Warn("Error writing knownGaps to DB, writing them to file instead")
fileIndexer.PushKnownGaps(startingBlockNumber, endingBlockNumber, checkedOut, processingKey, nil)
return err
}
return nil
Expand Down Expand Up @@ -617,7 +620,7 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer
// TODO:
// REmove the return value
// Write to file if err in writing to DB
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error {
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
if err != nil {
Expand All @@ -632,7 +635,7 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe
endBlock.Sub(latestBlockOnChain, expectedDifference)

log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock))
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey)
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer)
if err != nil {
// Write to file SQL file instead!!!
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
Expand Down
20 changes: 18 additions & 2 deletions statediff/indexer/database/sql/indexer_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@ package sql_test
import (
"bytes"
"context"
"errors"
"fmt"
"math/big"
"os"
"testing"

"github.com/ipfs/go-cid"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
Expand All @@ -24,6 +27,7 @@ import (

var (
db sql.Database
sqlxdb *sqlx.DB
err error
ind interfaces.StateDiffIndexer
chainConf = params.MainnetChainConfig
Expand Down Expand Up @@ -162,6 +166,16 @@ func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) {
return stateDiff, err
}

func setupFile(t *testing.T) interfaces.StateDiffIndexer {
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
require.NoError(t, err)
return ind
}

func tearDown(t *testing.T) {
sql.TearDownDB(t, db)
err := ind.Close()
Expand All @@ -180,6 +194,8 @@ func testKnownGapsUpsert(t *testing.T) {
t.Fatal(err)
}

fileInd := setupFile(t)

// Get the latest block from the DB
latestBlockInDb, err := stateDiff.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
if err != nil {
Expand All @@ -193,7 +209,7 @@ func testKnownGapsUpsert(t *testing.T) {
t.Log("The latest block on the chain is: ", latestBlockOnChain)
t.Log("The latest block on the DB is: ", latestBlockInDb)

gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0, fileInd)
require.NoError(t, gapUpsertErr)

// Calculate what the start and end block should be in known_gaps
Expand All @@ -206,7 +222,7 @@ func testKnownGapsUpsert(t *testing.T) {
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlock, endBlock)

_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock)
require.NoError(t, queryErr)
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock)

}
4 changes: 3 additions & 1 deletion statediff/indexer/database/sql/postgres/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (db *DB) InsertIPLDsStm() string {

// InsertKnownGapsStm satisfies the sql.Statements interface
func (db *DB) InsertKnownGapsStm() string {
return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4) ON CONFLICT (starting_block_number) DO NOTHING`
return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4)
ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
WHERE eth.known_gaps.ending_block_number <= $2`
//return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES (1, 2, true, 1)`
}
6 changes: 4 additions & 2 deletions statediff/indexer/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ type StateDiffIndexer interface {
PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
ReportDBMetrics(delay time.Duration, quit <-chan bool)
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer StateDiffIndexer) error
// The indexer at the end allows us to pass one indexer to another.
// We use then for the SQL indexer, we pass it the file indexer so it can write to file if writing to the DB fails.
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer StateDiffIndexer) error
io.Closer
}

Expand Down
38 changes: 29 additions & 9 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
ind "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
types2 "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"
)
Expand Down Expand Up @@ -125,7 +126,7 @@ type Service struct {
// Should the statediff service wait for geth to sync to head?
WaitForSync bool
// Used to signal if we should check for KnownGaps
KnownGaps KnownGaps
KnownGaps KnownGapsState
// Whether or not we have any subscribers; only if we do, do we processes state diffs
subscribers int32
// Interface for publishing statediffs as PG-IPLD objects
Expand All @@ -138,7 +139,8 @@ type Service struct {
maxRetry uint
}

type KnownGaps struct {
// This structure keeps track of the knownGaps at any given moment in time
type KnownGapsState struct {
// Should we check for gaps by looking at the DB and comparing the latest block with head
checkForGaps bool
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
Expand All @@ -157,6 +159,9 @@ type KnownGaps struct {
// The last processed block keeps track of the last processed block.
// Its used to make sure we didn't skip over any block!
lastProcessedBlock *big.Int
// This fileIndexer is used to write the knownGaps to file
// If we can't properly write to DB
fileIndexer interfaces.StateDiffIndexer
}

// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
Expand Down Expand Up @@ -192,7 +197,7 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
// 121 to 124
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey)
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}

if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
Expand All @@ -204,12 +209,12 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
// 111 to 114
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey)
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}

log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks))
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey)
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)

} else {
log.Warn("We missed blocks without any errors.")
Expand All @@ -219,7 +224,7 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock))
log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock))
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey)
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}
}

Expand All @@ -242,6 +247,7 @@ func NewBlockCache(max uint) BlockCache {
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
blockChain := ethServ.BlockChain()
var indexer interfaces.StateDiffIndexer
var fileIndexer interfaces.StateDiffIndexer
quitCh := make(chan bool)
if params.IndexerConfig != nil {
info := nodeinfo.Info{
Expand All @@ -252,23 +258,37 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
ClientName: params.ClientName,
}
var err error
indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
if err != nil {
return err
}
if params.IndexerConfig.Type() != shared.FILE {
fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
log.Info("Starting the statediff service in ", "mode", params.IndexerConfig.Type())
if err != nil {
return err
}

} else {
log.Info("Starting the statediff service in ", "mode", "File")
fileIndexer = indexer
}
//fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info)
indexer.ReportDBMetrics(10*time.Second, quitCh)
}

workers := params.NumWorkers
if workers == 0 {
workers = 1
}
// If we ever have multiple processingKeys we can update them here
// along with the expectedDifference
knownGaps := &KnownGaps{
knownGaps := &KnownGapsState{
checkForGaps: true,
processingKey: 0,
expectedDifference: big.NewInt(1),
errorState: false,
fileIndexer: fileIndexer,
}
sds := &Service{
Mutex: sync.Mutex{},
Expand Down Expand Up @@ -409,7 +429,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
// Check and update the gaps table.
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
log.Info("Checking for Gaps at current block: ", currentBlock.Number())
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
sds.KnownGaps.checkForGaps = false
}

Expand Down
Loading

0 comments on commit f566aa7

Please sign in to comment.