Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disassociate block number from the indexer object #248

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:

env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '382aca8e42bc5e33f301f77cdd2e09cc80602fc3'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || '48eb594ea95763bda8e51590f105f7a2657ac6d4' }}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || '65b7bee7a6757c1fc527c8bfdc4f99ab915fcf36' }}
GOPATH: /tmp/go

jobs:
Expand Down
17 changes: 10 additions & 7 deletions statediff/indexer/database/dump/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber uint64
BlockNumber string
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
Expand Down Expand Up @@ -68,15 +68,17 @@ func (tx *BatchTx) cache() {

func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{
Key: key,
Data: value,
BlockNumber: tx.BlockNumber,
Key: key,
Data: value,
}
}

func (tx *BatchTx) cacheIPLD(i node.Node) {
tx.iplds <- models.IPLDModel{
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
BlockNumber: tx.BlockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
}
}

Expand All @@ -87,8 +89,9 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
}
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
tx.iplds <- models.IPLDModel{
Key: prefixedKey,
Data: raw,
BlockNumber: tx.BlockNumber,
Key: prefixedKey,
Data: raw,
}
return c.String(), prefixedKey, err
}
126 changes: 68 additions & 58 deletions statediff/indexer/database/dump/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()

blockTx := &BatchTx{
BlockNumber: height,
BlockNumber: block.Number().String(),
dump: sdi.dump,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
Expand Down Expand Up @@ -146,7 +146,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(blockTx, headerID, height, uncleNodes)
err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
}

// processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode)
Expand All @@ -215,15 +215,16 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0)
} else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
}
uncle := models.UncleModel{
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
BlockNumber: blockNumber.String(),
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", uncle); err != nil {
return err
Expand Down Expand Up @@ -274,16 +275,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: trxID,
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: trxID,
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
return err
Expand All @@ -296,6 +298,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex()
}
accessListElementModel := models.AccessListElementModel{
BlockNumber: args.blockNumber.String(),
TxID: trxID,
Index: int64(j),
Address: accessListElement.Address.Hex(),
Expand All @@ -319,6 +322,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}

rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(),
TxID: trxID,
Contract: contract,
ContractHash: contractHash,
Expand Down Expand Up @@ -348,16 +352,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}

logDataSet[idx] = &models.LogsModel{
ReceiptID: trxID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
BlockNumber: args.blockNumber.String(),
ReceiptID: trxID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
}
}

Expand All @@ -379,33 +384,35 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// publish the state node
var stateModel models.StateNodeModel
if stateNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel = models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
}
} else {
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel = models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
}
}

Expand All @@ -428,6 +435,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := models.StateAccountModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Balance: account.Balance.String(),
Expand All @@ -446,13 +454,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err
Expand All @@ -464,13 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err
Expand All @@ -484,7 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
Expand Down
Loading