Skip to content

Commit

Permalink
batch inserts to public.blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Nov 9, 2021
1 parent e0717f2 commit db85afd
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 129 deletions.
91 changes: 91 additions & 0 deletions statediff/indexer/batch_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package indexer

import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
)

const ipldBatchInsertPgStr string = `INSERT INTO public.blocks (key, data) VALUES (unnest($1::TEXT[]), unnest($2::BYTEA[])) ON CONFLICT (key) DO NOTHING`

// BlockTx wraps a Postgres tx with the state necessary for building the Postgres tx concurrently during trie difference iteration
type BlockTx struct {
dbtx *sqlx.Tx
BlockNumber uint64
headerID int64
Close func(blockTx *BlockTx, err error) error

quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
}

func (tx *BlockTx) flush() error {
_, err := tx.dbtx.Exec(ipldBatchInsertPgStr, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values))
if err != nil {
return err
}
tx.ipldCache = models.IPLDBatch{}
return nil
}

// run in background goroutine to synchronize concurrent appends to the ipldCache
func (tx *BlockTx) cache() {
for {
select {
case i := <-tx.iplds:
tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key)
tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data)
case <-tx.quit:
return
}
}
}

func (tx *BlockTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{
Key: key,
Data: value,
}
}

func (tx *BlockTx) cacheIPLD(i node.Node) {
tx.iplds <- models.IPLDModel{
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
}
}

func (tx *BlockTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error) {
c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil {
return "", "", err
}
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
tx.iplds <- models.IPLDModel{
Key: prefixedKey,
Data: raw,
}
return c.String(), prefixedKey, err
}
116 changes: 51 additions & 65 deletions statediff/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -65,7 +64,6 @@ type Indexer interface {
type StateDiffIndexer struct {
chainConfig *params.ChainConfig
dbWriter *PostgresCIDWriter
init bool
}

// NewStateDiffIndexer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface
Expand All @@ -80,13 +78,6 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) (*Sta
}, nil
}

type BlockTx struct {
dbtx *sqlx.Tx
BlockNumber uint64
headerID int64
Close func(err error) error
}

// ReportDBMetrics is a reporting function to run as goroutine
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
if !metrics.Enabled {
Expand Down Expand Up @@ -127,7 +118,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}

if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) {
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d)to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs))
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d) to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs))
}
if len(txTrieNodes) != len(rctTrieNodes) {
return nil, fmt.Errorf("expected number of tx trie (%d) and rct trie (%d) nodes to be equal", len(txTrieNodes), len(rctTrieNodes))
}

// Calculate reward
Expand All @@ -139,6 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
reward = CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
}
t = time.Now()

// Begin new db tx for everything
tx, err := sdi.dbWriter.db.Beginx()
if err != nil {
Expand All @@ -153,9 +148,14 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
}()
blockTx := &BlockTx{
dbtx: tx,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
dbtx: tx,
// handle transaction commit or rollback for any return case
Close: func(err error) error {
Close: func(self *BlockTx, err error) error {
close(self.quit)
close(self.iplds)
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
Expand All @@ -166,6 +166,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
shared.Rollback(tx)
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
return err
}
err = tx.Commit()
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
Expand All @@ -176,6 +182,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return err
},
}
go blockTx.cache()

tDiff := time.Since(t)
indexerMetrics.tFreePostgres.Update(tDiff)

Expand All @@ -184,7 +192,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip

// Publish and index header, collect headerID
var headerID int64
headerID, err = sdi.processHeader(tx, block.Header(), headerNode, reward, totalDifficulty)
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
if err != nil {
return nil, err
}
Expand All @@ -193,7 +201,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(tx, headerID, height, uncleNodes)
err = sdi.processUncles(blockTx, headerID, height, uncleNodes)
if err != nil {
return nil, err
}
Expand All @@ -202,7 +210,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(tx, processArgs{
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
headerID: headerID,
blockNumber: block.Number(),
receipts: receipts,
Expand Down Expand Up @@ -230,11 +238,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip

// processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
// publish header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
return 0, fmt.Errorf("error publishing header IPLD: %v", err)
}
func (sdi *StateDiffIndexer) processHeader(tx *BlockTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
tx.cacheIPLD(headerNode)

var baseFee *int64
if header.BaseFee != nil {
Expand All @@ -243,7 +248,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, he
}

// index header
return sdi.dbWriter.upsertHeaderCID(tx, models.HeaderModel{
return sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(),
Expand All @@ -262,12 +267,10 @@ func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, he
}

// processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
func (sdi *StateDiffIndexer) processUncles(tx *BlockTx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
return fmt.Errorf("error publishing uncle IPLD: %v", err)
}
tx.cacheIPLD(uncleNode)
var uncleReward *big.Int
// in PoA networks uncle reward is 0
if sdi.chainConfig.Clique != nil {
Expand All @@ -282,7 +285,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNum
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
if err := sdi.dbWriter.upsertUncleCID(tx, uncle, headerID); err != nil {
if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle, headerID); err != nil {
return err
}
}
Expand All @@ -305,28 +308,15 @@ type processArgs struct {
}

// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs) error {
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BlockTx, args processArgs) error {
// Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts {
// tx that corresponds with this receipt
trx := args.txs[i]
from, err := types.Sender(signer, trx)
if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err)
}

for _, trie := range args.logTrieNodes[i] {
if err = shared.PublishIPLD(tx, trie); err != nil {
return fmt.Errorf("error publishing log trie node IPLD: %w", err)
}
for _, logTrieNode := range args.logTrieNodes[i] {
tx.cacheIPLD(logTrieNode)
}

// publish the txs and receipts
txNode := args.txNodes[i]
if err := shared.PublishIPLD(tx, txNode); err != nil {
return fmt.Errorf("error publishing tx IPLD: %v", err)
}
tx.cacheIPLD(txNode)

// Indexing
// extract topic and contract data from the receipt for indexing
Expand All @@ -344,7 +334,6 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs

mappedContracts[l.Address.String()] = true
logDataSet[idx] = &models.LogsModel{
ID: 0,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
Expand All @@ -368,6 +357,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
}
// index tx first so that the receipt can reference it by FK
trx := args.txs[i]
// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
Expand All @@ -381,7 +376,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
if txType != types.LegacyTxType {
txModel.Type = &txType
}
txID, err := sdi.dbWriter.upsertTransactionCID(tx, txModel, args.headerID)
txID, err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel, args.headerID)
if err != nil {
return err
}
Expand All @@ -397,7 +392,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
Address: accessListElement.Address.Hex(),
StorageKeys: storageKeys,
}
if err := sdi.dbWriter.upsertAccessListElement(tx, accessListElementModel, txID); err != nil {
if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel, txID); err != nil {
return err
}
}
Expand All @@ -420,27 +415,20 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
}

receiptID, err := sdi.dbWriter.upsertReceiptCID(tx, rctModel, txID)
receiptID, err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel, txID)
if err != nil {
return err
}

if err = sdi.dbWriter.upsertLogCID(tx, logDataSet, receiptID); err != nil {
if err = sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet, receiptID); err != nil {
return err
}
}

// publish trie nodes, these aren't indexed directly
for _, n := range args.txTrieNodes {
if err := shared.PublishIPLD(tx, n); err != nil {
return fmt.Errorf("error publishing tx trie node IPLD: %w", err)
}
}

for _, n := range args.rctTrieNodes {
if err := shared.PublishIPLD(tx, n); err != nil {
return fmt.Errorf("error publishing rct trie node IPLD: %w", err)
}
for i, n := range args.txTrieNodes {
tx.cacheIPLD(n)
tx.cacheIPLD(args.rctTrieNodes[i])
}

return nil
Expand All @@ -462,9 +450,9 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
_, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
return err
}
stateCIDStr, stateMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error publishing state node IPLD: %v", err)
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel := models.StateNodeModel{
Path: stateNode.Path,
Expand Down Expand Up @@ -518,9 +506,9 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
}
continue
}
storageCIDStr, storageMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
storageCIDStr, storageMhKey, err := tx.cacheRaw(ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return fmt.Errorf("error publishing storage node IPLD: %v", err)
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
Path: storageNode.Path,
Expand All @@ -544,8 +532,6 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd
if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
if err := shared.PublishDirect(tx.dbtx, mhKey, codeAndCodeHash.Code); err != nil {
return fmt.Errorf("error publishing code IPLD: %v", err)
}
tx.cacheDirect(mhKey, codeAndCodeHash.Code)
return nil
}
Loading

0 comments on commit db85afd

Please sign in to comment.