Skip to content

Commit

Permalink
Merge pull request #205 from vulcanize/feature/155-waitForSync
Browse files Browse the repository at this point in the history
Add a flag to start processing statediff if we are caught up to the head of the chain
  • Loading branch information
abdulrabbani00 authored Mar 18, 2022
2 parents 53c1322 + 333dd3f commit 3045068
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 42 deletions.
3 changes: 2 additions & 1 deletion cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
Context: context.Background(),
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
WaitForSync: ctx.GlobalBool(utils.StateDiffWaitForSync.Name),
}
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p)
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
}

// Configure GraphQL if requested
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ var (
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFilePath,
utils.StateDiffWaitForSync,
configFileFlag,
}

Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFilePath,
utils.StateDiffWaitForSync,
},
},
{
Expand Down
8 changes: 6 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,10 @@ var (
Usage: "Number of concurrent workers to use during statediff processing (default 1)",
Value: 1,
}
StateDiffWaitForSync = cli.BoolFlag{
Name: "statediff.waitforsync",
Usage: "Should the statediff service wait for geth to catch up to the head of the chain?",
}
)

// MakeDataDir retrieves the currently requested data directory, terminating
Expand Down Expand Up @@ -1865,8 +1869,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C
}

// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediff.Config) {
if err := statediff.New(stack, ethServ, cfg, params); err != nil {
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediff.Config, backend ethapi.Backend) {
if err := statediff.New(stack, ethServ, cfg, params, backend); err != nil {
Fatalf("Failed to register the Statediff service: %v", err)
}
}
Expand Down
94 changes: 64 additions & 30 deletions statediff/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ either relaying the state objects to RPC subscribers or writing them directly to
It also exposes RPC endpoints for fetching or writing to Postgres the state diff at a specific block height
or for a specific block hash, this operates on historical block and state data and so depends on a complete state archive.

Data is emitted in this differential format in order to make it feasible to IPLD-ize and index the *entire* Ethereum state
Data is emitted in this differential format in order to make it feasible to IPLD-ize and index the _entire_ Ethereum state
(including intermediate state and storage trie nodes). If this state diff process is ran continuously from genesis,
the entire state at any block can be materialized from the cumulative differentials up to that point.

## Statediff object

A state diff `StateObject` is the collection of all the state and storage trie nodes that have been updated in a given block.
For convenience, we also associate these nodes with the block number and hash, and optionally the set of code hashes and code for any
contracts deployed in this block.
Expand Down Expand Up @@ -52,6 +53,7 @@ type CodeAndCodeHash struct {
Code []byte `json:"code"`
}
```

These objects are packed into a `Payload` structure which can additionally associate the `StateObject`
with the block (header, uncles, and transactions), receipts, and total difficulty.
This `Payload` encapsulates all of the differential data at a given block, and allows us to index the entire Ethereum data structure
Expand All @@ -71,38 +73,57 @@ type Payload struct {
```

## Usage

This state diffing service runs as an auxiliary service concurrent to the regular syncing process of the geth node.

### CLI configuration

This service introduces a CLI flag namespace `statediff`

`--statediff` flag is used to turn on the service
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file)
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
`--statediff.db.host` is the hostname/ip to dial to connect to the database
`--statediff.db.port` is the port to dial to connect to the database
`--statediff.db.name` is the name of the database to connect to
`--statediff.db.user` is the user to connect to the database as
`--statediff.db.password` is the password to use to connect to the database
`--statediff.db.conntimeout` is the connection timeout (in seconds)
`--statediff.db.maxconns` is the maximum number of database connections
`--statediff.db.minconns` is the minimum number of database connections
`--statediff.db.maxidleconns` is the maximum number of idle connections
`--statediff.db.maxconnidletime` is the maximum lifetime for an idle connection (in seconds)
`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds)
`--statediff.db.nodeid` is the node id to use in the Postgres database
`--statediff.db.clientname` is the client name to use in the Postgres database
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
`--statediff` flag is used to turn on the service

`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database

`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database

`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file)

`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)

`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)

`--statediff.db.host` is the hostname/ip to dial to connect to the database

`--statediff.db.port` is the port to dial to connect to the database

`--statediff.db.name` is the name of the database to connect to

`--statediff.db.user` is the user to connect to the database as

`--statediff.db.password` is the password to use to connect to the database

`--statediff.db.conntimeout` is the connection timeout (in seconds)

`--statediff.db.maxconns` is the maximum number of database connections

`--statediff.db.minconns` is the minimum number of database connections

`--statediff.db.maxidleconns` is the maximum number of idle connections

`--statediff.db.maxconnidletime` is the maximum lifetime for an idle connection (in seconds)

`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds)

`--statediff.db.nodeid` is the node id to use in the Postgres database

`--statediff.db.clientname` is the client name to use in the Postgres database

`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode

The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)

e.g.
`
./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db.type=postgres --statediff.db.driver=sqlx --statediff.db.host=localhost --statediff.db.port=5432 --statediff.db.name=vulcanize_test --statediff.db.user=postgres --statediff.db.nodeid=nodeid --statediff.db.clientname=clientname
`
`./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db.type=postgres --statediff.db.driver=sqlx --statediff.db.host=localhost --statediff.db.port=5432 --statediff.db.name=vulcanize_test --statediff.db.user=postgres --statediff.db.nodeid=nodeid --statediff.db.clientname=clientname`

When operating in `--statediff.db.type=file` mode, the service will write SQL statements out to the file designated by
`--statediff.file.path`. Please note that it writes out SQL statements with all `ON CONFLICT` constraint checks dropped.
Expand All @@ -112,6 +133,7 @@ de-duplicate using unix tools (`sort statediff.sql | uniq` or `sort -u statediff
back afterwards.

### RPC endpoints

The state diffing service exposes both a WS subscription endpoint, and a number of HTTP unary endpoints.

Each of these endpoints requires a set of parameters provided by the caller
Expand All @@ -137,6 +159,7 @@ contracts deployed in this block; whether to limit the diffing process to a list
whether to limit the diffing process to a list of specific storage slot keys.

#### Subscription endpoint

A websocket supporting RPC endpoint is exposed for subscribing to state diff `StateObjects` that come off the head of the chain while the geth node syncs.

```go
Expand Down Expand Up @@ -182,7 +205,9 @@ for {
```

#### Unary endpoints

The service also exposes unary RPC endpoints for retrieving the state diff `StateObject` for a specific block height/hash.

```go
// StateDiffAt returns a state diff payload at the specific blockheight
StateDiffAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error)
Expand All @@ -195,12 +220,14 @@ To expose this endpoint the node needs to have the HTTP server turned on (`--htt
and the `statediff` namespace exposed (`--http.api=statediff`).

### Direct indexing into Postgres

If `--statediff.writing` is set, the service will convert the state diff `StateObject` data into IPLD objects, persist them directly to Postgres,
and generate secondary indexes around the IPLD data.

The schema and migrations for this Postgres database are provided in `statediff/db/`.

#### Postgres setup

We use [pressly/goose](https://github.com/pressly/goose) as our Postgres migration manager.
You can also load the Postgres schema directly into a database using

Expand All @@ -209,6 +236,7 @@ You can also load the Postgres schema directly into a database using
This will only work on a version 12.4 Postgres database.

#### Schema overview

Our Postgres schemas are built around a single IPFS backing Postgres IPLD blockstore table (`public.blocks`) that conforms with [go-ds-sql](https://github.com/ipfs/go-ds-sql/blob/master/postgres/postgres.go).
All IPLD objects are stored in this table, where `key` is the blockstore-prefixed multihash key for the IPLD object and `data` contains
the bytes for the IPLD block (in the case of all Ethereum IPLDs, this is the RLP byte encoding of the Ethereum object).
Expand All @@ -227,6 +255,7 @@ table contains a `state_id` foreign key which references the `id` for the `state
and in turn that `state_cids` entry contains a `header_id` foreign key which references the `id` of the `header_cids` entry that contains the header for the block these state and storage nodes were updated (diffed).

### Optimization

On mainnet this process is extremely IO intensive and requires significant resources to allow it to keep up with the head of the chain.
The state diff processing time for a specific block is dependent on the number and complexity of the state changes that occur in a block and
the number of updated state nodes that are available in the in-memory cache vs must be retrieved from disc.
Expand All @@ -236,6 +265,7 @@ This can be done by increasing the overall `--cache` allocation and/or by increa
usage with `--cache.trie`.

## Versioning, Branches, Rebasing, and Releasing

Internal tagged releases are maintained for building the latest version of statediffing geth or using it as a go mod dependency.
When a new core go-ethereum version is released, statediffing geth is rebased onto and adjusted to work with the new tag.

Expand All @@ -244,18 +274,20 @@ need to be able to squash our work before performing a rebase. To this end we re
the full incremental history.

### Versioning
Versioning for of statediffing geth follows the below format:

`{Root Version}-statediff-{Statediff Version}`
Example: `v1.10.16-statediff-3.0.2`

Where "root version" is the version of the tagged release from the core go-ethereum repository that our release is rebased on top of
and "statediff version" is the version tracking the state of the statediffing service code.

E.g. the version at the time of writing this is v1.10.3-statediff-0.0.23, v0.0.23 of the statediffing code rebased on top of the v1.10.3 core tag.
- The first section, `v1.10.16`, corresponds to the release of the root branch this version is rebased onto (e.g., [](https://github.com/ethereum/go-ethereum/releases/tag/v1.10.16)[https://github.com/ethereum/go-ethereum/releases/tag/v1.10.16](https://github.com/ethereum/go-ethereum/releases/tag/v1.10.16))
- The second section, `3.0.2`, corresponds to the version of our statediffing code. The major version here (3) should always correspond with the major version of the `ipld-eth-db` schema version it works with (e.g., [](https://github.com/vulcanize/ipld-eth-db/releases/tag/v3.0.6)[https://github.com/vulcanize/ipld-eth-db/releases/tag/v3.0.6](https://github.com/vulcanize/ipld-eth-db/releases/tag/v3.0.6)); it is only bumped when we bump the major version of the schema.
- The major version of the schema is only bumped when a breaking change is made to the schema.
- The minor version is bumped when a new feature is added, or a fix is performed that breaks or updates the statediffing API or CLI in some way.
- The patch version is bumped whenever minor fixes/patches/features are done that don’t change/break API/CLI compatibility.
- We are very strict about the first section and the major version of the statediffing code, but some discretion is required when deciding to bump minor versus patch version of the statediffing code.

The statediff version is included in the `VersionMeta` in params/version.go

### Branches

We maintain two official kinds of branches:

Major Branch: `{Root Version}-statediff`
Expand All @@ -271,7 +303,9 @@ If a developer is unsure what version their patch should affect, they should rem
they can open a PR against the targeted root branch and be directed to the appropriate feature version and branch.

### Rebasing

When a new root tagged release comes out we rebase our statediffing code on top of the new tag using the following process:

1. Checkout a new major branch for the tag from the current major branch
2. On the new major branch, squash all our commits since the last major rebase
3. On the new major branch, perform the rebase against the new tag
Expand Down
2 changes: 2 additions & 0 deletions statediff/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
EnableWriteLoop bool
// Size of the worker pool
NumWorkers uint
// Should the statediff service wait until geth has synced to the head of the blockchain?
WaitForSync bool
// Context
Context context.Context
}
Expand Down
58 changes: 56 additions & 2 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
Expand Down Expand Up @@ -118,11 +119,15 @@ type Service struct {
SubscriptionTypes map[common.Hash]Params
// Cache the last block so that we can avoid having to lookup the next block's parent
BlockCache BlockCache
// The publicBackendAPI which provides useful information about the current state
BackendAPI ethapi.Backend
// Should the statediff service wait for geth to sync to head?
WaitForSync bool
// Whether or not we have any subscribers; only if we do, do we processes state diffs
subscribers int32
// Interface for publishing statediffs as PG-IPLD objects
indexer interfaces.StateDiffIndexer
// Whether to enable writing state diffs directly to track blochain head
// Whether to enable writing state diffs directly to track blockchain head.
enableWriteLoop bool
// Size of the worker pool
numWorkers uint
Expand All @@ -146,7 +151,7 @@ func NewBlockCache(max uint) BlockCache {

// New creates a new statediff.Service
// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config) error {
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
blockChain := ethServ.BlockChain()
var indexer interfaces.StateDiffIndexer
quitCh := make(chan bool)
Expand Down Expand Up @@ -177,6 +182,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers),
BackendAPI: backend,
WaitForSync: params.WaitForSync,
indexer: indexer,
enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers,
Expand Down Expand Up @@ -528,10 +535,57 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
return nil
}

// This function will check the status of geth syncing.
// It will return false if geth has finished syncing.
// It will return a true Geth is still syncing.
func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.PublicEthereumAPI) (bool, error) {
syncStatus, err := pubEthAPI.Syncing()
if err != nil {
return true, err
}

if syncStatus != false {
return true, err
}
return false, err
}

// This function calls GetSyncStatus to check if we have caught up to head.
// It will keep looking and checking if we have caught up to head.
// It will only complete if we catch up to head, otherwise it will keep looping forever.
func (sds *Service) WaitingForSync() error {
log.Info("We are going to wait for geth to sync to head!")

// Has the geth node synced to head?
Synced := false
pubEthAPI := ethapi.NewPublicEthereumAPI(sds.BackendAPI)
for !Synced {
syncStatus, err := sds.GetSyncStatus(pubEthAPI)
if err != nil {
return err
}
if !syncStatus {
log.Info("Geth has caught up to the head of the chain")
Synced = true
} else {
time.Sleep(1 * time.Second)
}
}
return nil
}

// Start is used to begin the service
func (sds *Service) Start() error {
log.Info("Starting statediff service")

if sds.WaitForSync {
log.Info("Statediff service will wait until geth has caught up to the head of the chain.")
err := sds.WaitingForSync()
if err != nil {
return err
}
log.Info("Continuing with startdiff start process")
}
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.Loop(chainEventCh)

Expand Down
Loading

0 comments on commit 3045068

Please sign in to comment.