Skip to content

Commit

Permalink
Add Prometheus metrics and write SQL file if it has data.
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdul Rabbani committed Mar 30, 2022
1 parent b960661 commit fc6a937
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 43 deletions.
3 changes: 2 additions & 1 deletion related-repositories/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ The folder will allow developers to clone/move related repositories to this dire
It is recommended that you move the following repositories under this folder. Keep the repository names!

- `vulcanize/foundry-tests`
- `vulcanize/hive`
- `vulcanize/ipld-eth-db`

## Symlinks

You can also create symlinks in this folder with the location of your repositories.
75 changes: 55 additions & 20 deletions statediff/known_gaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package statediff
import (
"context"
"fmt"
"io/ioutil"
"math/big"
"os"
"strings"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
Expand All @@ -36,11 +38,6 @@ var (
defaultWriteFilePath = "./known_gaps.sql"
)

type KnownGaps interface {
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
}

type KnownGapsState struct {
// Should we check for gaps by looking at the DB and comparing the latest block with head
checkForGaps bool
Expand All @@ -61,10 +58,15 @@ type KnownGapsState struct {
writeFilePath string
// DB object to use for reading and writing to the DB
db sql.Database
//Do we have entries in the local sql file that need to be written to the DB
sqlFileWaitingForWrite bool
// Metrics object used to track metrics.
statediffMetrics statediffMetricsHandles
}

// Unused
func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifference *big.Int,
errorState bool, writeFilePath string, db sql.Database) *KnownGapsState {
errorState bool, writeFilePath string, db sql.Database, statediffMetrics statediffMetricsHandles) *KnownGapsState {

return &KnownGapsState{
checkForGaps: checkForGaps,
Expand All @@ -73,11 +75,12 @@ func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifferenc
errorState: errorState,
writeFilePath: writeFilePath,
db: db,
statediffMetrics: statediffMetrics,
}

}

func MinMax(array []*big.Int) (*big.Int, *big.Int) {
func minMax(array []*big.Int) (*big.Int, *big.Int) {
var max *big.Int = array[0]
var min *big.Int = array[0]
for _, value := range array {
Expand All @@ -96,7 +99,7 @@ func MinMax(array []*big.Int) (*big.Int, *big.Int) {
// 2. Write to sql file locally.
// 3. Write to prometheus directly.
// 4. Logs and error.
func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
func (kg *KnownGapsState) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
}
Expand All @@ -106,9 +109,13 @@ func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBloc
CheckedOut: checkedOut,
ProcessingKey: processingKey,
}
log.Info("Writing known gaps to the DB")

log.Info("Updating Metrics for the start and end block")
//kg.statediffMetrics.knownGapStart.Update(startingBlockNumber.Int64())
//kg.statediffMetrics.knownGapEnd.Update(endingBlockNumber.Int64())

var writeErr error
log.Info("Writing known gaps to the DB")
if kg.db != nil {
dbErr := kg.upsertKnownGaps(knownGap)
if dbErr != nil {
Expand All @@ -117,22 +124,24 @@ func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBloc
}
} else {
writeErr = kg.upsertKnownGapsFile(knownGap)

}
if writeErr != nil {
log.Info("Unsuccessful when writing to a file", "Error", writeErr)
return writeErr
log.Error("Unsuccessful when writing to a file", "Error", writeErr)
log.Error("Updating Metrics for the start and end error block")
log.Error("Unable to write the following Gaps to DB or File", "startBlock", startingBlockNumber, "endBlock", endingBlockNumber)
kg.statediffMetrics.knownGapErrorStart.Update(startingBlockNumber.Int64())
kg.statediffMetrics.knownGapErrorEnd.Update(endingBlockNumber.Int64())
}
return nil
}

// This is a simple wrapper function to write gaps from a knownErrorBlocks array.
func (kg *KnownGapsState) captureErrorBlocks(knownErrorBlocks []*big.Int) {
startErrorBlock, endErrorBlock := MinMax(knownErrorBlocks)
startErrorBlock, endErrorBlock := minMax(knownErrorBlocks)

log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
log.Warn("Updating known Gaps table", "startErrorBlock", startErrorBlock, "endErrorBlock", endErrorBlock, "processingKey", kg.processingKey)
kg.PushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey)
kg.pushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey)

}

Expand All @@ -156,9 +165,9 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer
// TODO:
// REmove the return value
// Write to file if err in writing to DB
func (kg *KnownGapsState) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
func (kg *KnownGapsState) findAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
// Make this global
latestBlockInDb, err := kg.QueryDbToBigInt(dbQueryString)
latestBlockInDb, err := kg.queryDbToBigInt(dbQueryString)
if err != nil {
return err
}
Expand All @@ -171,7 +180,7 @@ func (kg *KnownGapsState) FindAndUpdateGaps(latestBlockOnChain *big.Int, expecte
endBlock.Sub(latestBlockOnChain, expectedDifference)

log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
err := kg.PushKnownGaps(startBlock, endBlock, false, processingKey)
err := kg.pushKnownGaps(startBlock, endBlock, false, processingKey)
if err != nil {
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
return err
Expand Down Expand Up @@ -212,11 +221,37 @@ func (kg *KnownGapsState) upsertKnownGapsFile(knownGaps models.KnownGapsModel) e
return err
}
log.Info("Wrote the gaps to a local SQL file")
kg.sqlFileWaitingForWrite = true
return nil
}

func (kg *KnownGapsState) writeSqlFileStmtToDb() error {
log.Info("Writing the local SQL file for KnownGaps to the DB")
file, ioErr := ioutil.ReadFile(kg.writeFilePath)

if ioErr != nil {
log.Error("Unable to open local SQL File for writing")
return ioErr
}

requests := strings.Split(string(file), ";")

for _, request := range requests {
_, err := kg.db.Exec(context.Background(), request)
if err != nil {
log.Error("Unable to run insert statement from file to the DB")
return err
}
}
if err := os.Truncate(kg.writeFilePath, 0); err != nil {
log.Info("Failed to empty knownGaps file after inserting statements to the DB", "error", err)
}
kg.sqlFileWaitingForWrite = false
return nil
}

// This is a simple wrapper function which will run QueryRow on the DB
func (kg *KnownGapsState) QueryDb(queryString string) (string, error) {
func (kg *KnownGapsState) queryDb(queryString string) (string, error) {
var ret string
err := kg.db.QueryRow(context.Background(), queryString).Scan(&ret)
if err != nil {
Expand All @@ -228,9 +263,9 @@ func (kg *KnownGapsState) QueryDb(queryString string) (string, error) {

// This function is a simple wrapper which will call QueryDb but the return value will be
// a big int instead of a string
func (kg *KnownGapsState) QueryDbToBigInt(queryString string) (*big.Int, error) {
func (kg *KnownGapsState) queryDbToBigInt(queryString string) (*big.Int, error) {
ret := new(big.Int)
res, err := kg.QueryDb(queryString)
res, err := kg.queryDb(queryString)
if err != nil {
return ret, err
}
Expand Down
25 changes: 11 additions & 14 deletions statediff/known_gaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package statediff
import (
"context"
"fmt"
"io/ioutil"
"math/big"
"os"
"strings"
"testing"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
Expand Down Expand Up @@ -68,6 +67,7 @@ func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
processingKey: tc.processingKey,
expectedDifference: big.NewInt(tc.expectedDif),
db: db,
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
}
service := &Service{
KnownGaps: knownGaps,
Expand Down Expand Up @@ -106,6 +106,7 @@ func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
processingKey: tc.processingKey,
expectedDifference: big.NewInt(tc.expectedDif),
writeFilePath: knownGapsFilePath,
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
db: nil, // Only set to nil to be verbose that we can't use it
}
service := &Service{
Expand All @@ -116,18 +117,13 @@ func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
service.KnownGaps.knownErrorBlocks = knownErrorBlocks

testCaptureErrorBlocks(t, service)

file, ioErr := ioutil.ReadFile(knownGapsFilePath)
require.NoError(t, ioErr)

requests := strings.Split(string(file), ";")

newDb := setupDb(t)
service.KnownGaps.db = newDb
for _, request := range requests {
_, err := newDb.Exec(context.Background(), request)
require.NoError(t, err)
if service.KnownGaps.sqlFileWaitingForWrite {
writeErr := service.KnownGaps.writeSqlFileStmtToDb()
require.NoError(t, writeErr)
}

// Validate that the upsert was done correctly.
validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
tearDown(t, newDb)
Expand All @@ -145,12 +141,13 @@ func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) {
processingKey: 1,
expectedDifference: big.NewInt(1),
db: db,
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
}
service := &Service{
KnownGaps: knownGaps,
}

latestBlockInDb, err := service.KnownGaps.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
latestBlockInDb, err := service.KnownGaps.queryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
if err != nil {
t.Skip("Can't find a block in the eth.header_cids table.. Please put one there")
}
Expand All @@ -165,7 +162,7 @@ func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) {
t.Log("The latest block on the chain is: ", latestBlockOnChain)
t.Log("The latest block on the DB is: ", latestBlockInDb)

gapUpsertErr := service.KnownGaps.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
gapUpsertErr := service.KnownGaps.findAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
require.NoError(t, gapUpsertErr)

startBlock := big.NewInt(0)
Expand Down Expand Up @@ -195,7 +192,7 @@ func validateUpsert(t *testing.T, service *Service, startingBlock int64, endingB
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)

_, queryErr := service.KnownGaps.QueryDb(queryString) // Figure out the string.
_, queryErr := service.KnownGaps.queryDb(queryString) // Figure out the string.
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
require.NoError(t, queryErr)
}
Expand Down
16 changes: 16 additions & 0 deletions statediff/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ type statediffMetricsHandles struct {
// Current length of chainEvent channels
serviceLoopChannelLen metrics.Gauge
writeLoopChannelLen metrics.Gauge
// The start block of the known gap
knownGapStart metrics.Gauge
// The end block of the known gap
knownGapEnd metrics.Gauge
// A known gaps start block which had an error being written to the DB
knownGapErrorStart metrics.Gauge
// A known gaps end block which had an error being written to the DB
knownGapErrorEnd metrics.Gauge
}

func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
Expand All @@ -59,12 +67,20 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
lastStatediffHeight: metrics.NewGauge(),
serviceLoopChannelLen: metrics.NewGauge(),
writeLoopChannelLen: metrics.NewGauge(),
knownGapStart: metrics.NewGauge(),
knownGapEnd: metrics.NewGauge(),
knownGapErrorStart: metrics.NewGauge(),
knownGapErrorEnd: metrics.NewGauge(),
}
subsys := "service"
reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight)
reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight)
reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight)
reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen)
reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen)
reg.Register(metricName(subsys, "known_gaps_start"), ctx.knownGapStart)
reg.Register(metricName(subsys, "known_gaps_end"), ctx.knownGapEnd)
reg.Register(metricName(subsys, "known_gaps_error_start"), ctx.knownGapErrorStart)
reg.Register(metricName(subsys, "known_gaps_error_end"), ctx.knownGapErrorEnd)
return ctx
}
24 changes: 16 additions & 8 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
// If we ever have multiple processingKeys we can update them here
// along with the expectedDifference
knownGaps := &KnownGapsState{
processingKey: 0,
expectedDifference: big.NewInt(1),
errorState: false,
writeFilePath: params.KnownGapsFilePath,
db: db,
processingKey: 0,
expectedDifference: big.NewInt(1),
errorState: false,
writeFilePath: params.KnownGapsFilePath,
db: db,
statediffMetrics: statediffMetrics,
sqlFileWaitingForWrite: false,
}
if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true
Expand Down Expand Up @@ -336,7 +338,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
// Check and update the gaps table.
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
log.Info("Checking for Gaps at", "current block", currentBlock.Number())
go sds.KnownGaps.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
go sds.KnownGaps.findAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
sds.KnownGaps.checkForGaps = false
}

Expand All @@ -357,11 +359,17 @@ func (sds *Service) writeLoopWorker(params workerParams) {
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
sds.KnownGaps.knownErrorBlocks = nil

log.Debug("Starting capturedMissedBlocks")
go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks)
}

if sds.KnownGaps.sqlFileWaitingForWrite {
log.Info("There are entries in the SQL file for knownGaps that should be written")
err := sds.KnownGaps.writeSqlFileStmtToDb()
if err != nil {
log.Error("Unable to write KnownGap sql file to DB")
}
}

// TODO: how to handle with concurrent workers
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
case <-sds.QuitChan:
Expand Down

0 comments on commit fc6a937

Please sign in to comment.