Skip to content

Commit

Permalink
update sql indexer to use new v4 schema that denormalizes by block_nu…
Browse files Browse the repository at this point in the history
…mber for the purposes of partitioning & sharding
  • Loading branch information
i-norden committed Mar 17, 2022
1 parent 6b74310 commit 9775355
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 190 deletions.
23 changes: 15 additions & 8 deletions statediff/indexer/database/sql/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/models"
)

const startingCacheCapacity = 1024 * 24

// BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber uint64
BlockNumber string
ctx context.Context
dbtx Tx
stm string
Expand All @@ -48,7 +50,8 @@ func (tx *BatchTx) Submit(err error) error {
}

func (tx *BatchTx) flush() error {
_, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values))
_, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys),
pq.Array(tx.ipldCache.Values))
if err != nil {
return err
}
Expand All @@ -61,6 +64,7 @@ func (tx *BatchTx) cache() {
for {
select {
case i := <-tx.iplds:
tx.ipldCache.BlockNumbers = append(tx.ipldCache.BlockNumbers, i.BlockNumber)
tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key)
tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data)
case <-tx.quit:
Expand All @@ -72,15 +76,17 @@ func (tx *BatchTx) cache() {

func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{
Key: key,
Data: value,
BlockNumber: tx.BlockNumber,
Key: key,
Data: value,
}
}

func (tx *BatchTx) cacheIPLD(i node.Node) {
tx.iplds <- models.IPLDModel{
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
BlockNumber: tx.BlockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
}
}

Expand All @@ -91,8 +97,9 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
}
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
tx.iplds <- models.IPLDModel{
Key: prefixedKey,
Data: raw,
BlockNumber: tx.BlockNumber,
Key: prefixedKey,
Data: raw,
}
return c.String(), prefixedKey, err
}
Expand Down
132 changes: 73 additions & 59 deletions statediff/indexer/database/sql/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}()
blockTx := &BatchTx{
ctx: sdi.ctx,
BlockNumber: height,
BlockNumber: block.Number().String(),
stm: sdi.dbWriter.db.InsertIPLDsStm(),
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
dbtx: tx,
ipldCache: models.IPLDBatch{
BlockNumbers: make([]string, 0, startingCacheCapacity),
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx,
// handle transaction commit or rollback for any return case
submit: func(self *BatchTx, err error) error {
defer func() {
Expand Down Expand Up @@ -200,7 +204,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(blockTx, headerID, height, uncleNodes)
err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -264,7 +268,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
}

// processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode)
Expand All @@ -273,15 +277,16 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0)
} else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
}
uncle := models.UncleModel{
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
BlockNumber: blockNumber.String(),
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle); err != nil {
return err
Expand Down Expand Up @@ -331,16 +336,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: txID,
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: txID,
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil {
return err
Expand All @@ -353,6 +359,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex()
}
accessListElementModel := models.AccessListElementModel{
BlockNumber: args.blockNumber.String(),
TxID: txID,
Index: int64(j),
Address: accessListElement.Address.Hex(),
Expand All @@ -376,6 +383,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}

rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(),
TxID: txID,
Contract: contract,
ContractHash: contractHash,
Expand Down Expand Up @@ -406,16 +414,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}

logDataSet[idx] = &models.LogsModel{
ReceiptID: txID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
BlockNumber: args.blockNumber.String(),
ReceiptID: txID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
}
}

Expand All @@ -434,7 +443,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}

// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, blockNumber, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
Expand All @@ -444,12 +453,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel := models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
BlockNumber: blockNumber,
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
}
return sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel)
}
Expand All @@ -458,12 +468,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel := models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
BlockNumber: blockNumber,
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
}
// index the state node
if err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel); err != nil {
Expand All @@ -483,6 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := models.StateAccountModel{
BlockNumber: blockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Balance: account.Balance.String(),
Expand All @@ -500,13 +512,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
BlockNumber: blockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
}
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil {
return err
Expand All @@ -518,13 +531,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
BlockNumber: blockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
}
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil {
return err
Expand Down
24 changes: 12 additions & 12 deletions statediff/indexer/database/sql/postgres/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,58 +44,58 @@ func (db *DB) InsertHeaderStm() string {

// InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string {
return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
return `INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (block_hash) DO NOTHING`
}

// InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string {
return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
return `INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (tx_hash) DO NOTHING`
}

// InsertAccessListElementStm satisfies the sql.Statements interface
func (db *DB) InsertAccessListElementStm() string {
return `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
return `INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tx_id, index) DO NOTHING`
}

// InsertRctStm satisfies the sql.Statements interface
func (db *DB) InsertRctStm() string {
return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
return `INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (tx_id) DO NOTHING`
}

// InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertLogStm() string {
return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
return `INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (rct_id, index) DO NOTHING`
}

// InsertStateStm satisfies the sql.Statements interface
func (db *DB) InsertStateStm() string {
return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)`
}

// InsertAccountStm satisfies the sql.Statements interface
func (db *DB) InsertAccountStm() string {
return `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6)
return `INSERT INTO eth.state_accounts (block_number, header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO NOTHING`
}

// InsertStorageStm satisfies the sql.Statements interface
func (db *DB) InsertStorageStm() string {
return `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)`
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)`
}

// InsertIPLDStm satisfies the sql.Statements interface
func (db *DB) InsertIPLDStm() string {
return `INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
return `INSERT INTO public.blocks (block_number, key, data) VALUES ($1, $2, $3) ON CONFLICT (block_number, key) DO NOTHING`
}

// InsertIPLDsStm satisfies the sql.Statements interface
func (db *DB) InsertIPLDsStm() string {
return `INSERT INTO public.blocks (key, data) VALUES (unnest($1::TEXT[]), unnest($2::BYTEA[])) ON CONFLICT (key) DO NOTHING`
return `INSERT INTO public.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT (block_number, key) DO NOTHING`
}
Loading

0 comments on commit 9775355

Please sign in to comment.