Skip to content

Commit

Permalink
feat: index genesis
Browse files Browse the repository at this point in the history
Signed-off-by: Norman Meier <[email protected]>
  • Loading branch information
n0izn0iz committed Apr 15, 2024
1 parent 2622efe commit 19c991f
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 48 deletions.
9 changes: 9 additions & 0 deletions client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ func (c *Client) GetBlock(blockNum uint64) (*core_types.ResultBlock, error) {
return block, nil
}

func (c *Client) GetGenesisBlock() (*core_types.ResultGenesis, error) {
genesis, err := c.client.Genesis()
if err != nil {
return nil, fmt.Errorf("unable to get genesis block, %w", err)
}

return genesis, nil
}

func (c *Client) GetBlockResults(blockNum uint64) (*core_types.ResultBlockResults, error) {
bn := int64(blockNum)

Expand Down
172 changes: 124 additions & 48 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
queue "github.com/madz-lab/insertion-queue"
"go.uber.org/zap"

"github.com/gnolang/gno/gno.land/pkg/gnoland"
"github.com/gnolang/gno/tm2/pkg/amino"
bftTypes "github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/tx-indexer/storage"
storageErrors "github.com/gnolang/tx-indexer/storage/errors"
"github.com/gnolang/tx-indexer/types"
Expand Down Expand Up @@ -70,6 +73,10 @@ func New(
// FetchChainData starts the fetching process that indexes
// blockchain data
func (f *Fetcher) FetchChainData(ctx context.Context) error {
if err := f.maybeFetchGenesis(); err != nil {
return fmt.Errorf("unable to index genesis block, %w", err)
}

collectorCh := make(chan *workerResponse, DefaultMaxSlots)

// attemptRangeFetch compares local and remote state
Expand Down Expand Up @@ -178,68 +185,137 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
// Pop the next chunk
f.chunkBuffer.PopFront()

wb := f.storage.WriteBatch()
if err := f.processSlot(item); err != nil {
return fmt.Errorf("unable to process slot, %w", err)
}
}
}
}
}

// Save the fetched data
for blockIndex, block := range item.chunk.blocks {
if saveErr := wb.SetBlock(block); saveErr != nil {
// This is a design choice that really highlights the strain
// of keeping legacy testnets running. Current TM2 testnets
// have blocks / transactions that are no longer compatible
// with latest "master" changes for Amino, so these blocks / txs are ignored,
// as opposed to this error being a show-stopper for the fetcher
f.logger.Error("unable to save block", zap.String("err", saveErr.Error()))
func (f *Fetcher) maybeFetchGenesis() error {
// Check if genesis block has already been indexed (latest height is set)
_, err := f.storage.GetLatestHeight()
if err == nil {
return nil
} else if !errors.Is(err, storageErrors.ErrNotFound) {
return fmt.Errorf("unable to fetch latest block height, %w", err)
}

continue
}
// Fetch genesis
iGenesisBlock, err := f.client.GetGenesisBlock()
if err != nil {
return fmt.Errorf("unable to fetch genesis block, %w", err)
}

f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height))
genesisState, ok := iGenesisBlock.Genesis.AppState.(gnoland.GnoGenesisState)
if !ok {
return errors.New("unable to cast genesis block to GnoGenesisState")
}

// Get block results
txResults := item.chunk.results[blockIndex]
// Convert genesis to normal block
bftTxs := make([]bftTypes.Tx, len(genesisState.Txs))
for i, tx := range genesisState.Txs {
bftTxs[i], err = amino.Marshal(tx)
if err != nil {
return fmt.Errorf("unable to marshal tx, %w", err)
}
}

// Save the fetched transaction results
for _, txResult := range txResults {
if err := wb.SetTx(txResult); err != nil {
f.logger.Error("unable to save tx", zap.String("err", err.Error()))
block := &bftTypes.Block{
Header: bftTypes.Header{
AppHash: iGenesisBlock.Genesis.AppHash,
ChainID: iGenesisBlock.Genesis.ChainID,
Time: iGenesisBlock.Genesis.GenesisTime,
Height: 0,
NumTxs: int64(len(bftTxs)),
TotalTxs: int64(len(bftTxs)),
},
Data: bftTypes.Data{
Txs: bftTxs,
},
}

continue
}
slot := &slot{
chunk: &chunk{
blocks: []*bftTypes.Block{block},
results: make([][]*bftTypes.TxResult, 1),
},
chunkRange: chunkRange{
from: 0, // should be -1, but we're using 0 to avoid underflow
to: 0,
},
}

f.logger.Debug(
"Added tx to batch",
zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())),
)
}
if err := f.processSlot(slot); err != nil {
return fmt.Errorf("unable to process genesis slot, %w", err)
}

// Alert any listeners of a new saved block
event := &types.NewBlock{
Block: block,
Results: txResults,
}
return nil
}

f.events.SignalEvent(event)
}
func (f *Fetcher) processSlot(slot *slot) error {
wb := f.storage.WriteBatch()

f.logger.Info(
"Added to batch block and tx data for range",
zap.Uint64("from", item.chunkRange.from),
zap.Uint64("to", item.chunkRange.to),
)
// Save the fetched data
for blockIndex, block := range slot.chunk.blocks {
if saveErr := wb.SetBlock(block); saveErr != nil {
// This is a design choice that really highlights the strain
// of keeping legacy testnets running. Current TM2 testnets
// have blocks / transactions that are no longer compatible
// with latest "master" changes for Amino, so these blocks / txs are ignored,
// as opposed to this error being a show-stopper for the fetcher
f.logger.Error("unable to save block", zap.String("err", saveErr.Error()))

// Save the latest height data
if err := wb.SetLatestHeight(item.chunkRange.to); err != nil {
if rErr := wb.Rollback(); rErr != nil {
return fmt.Errorf("unable to save latest height info, %w, %w", err, rErr)
}
continue
}

return fmt.Errorf("unable to save latest height info, %w", err)
}
f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height))

if err := wb.Commit(); err != nil {
return fmt.Errorf("error persisting block information into storage, %w", err)
}
// Get block results
txResults := slot.chunk.results[blockIndex]

// Save the fetched transaction results
for _, txResult := range txResults {
if err := wb.SetTx(txResult); err != nil {
f.logger.Error("unable to save tx", zap.String("err", err.Error()))

continue
}

f.logger.Debug(
"Added tx to batch",
zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())),
)
}

// Alert any listeners of a new saved block
event := &types.NewBlock{
Block: block,
Results: txResults,
}

f.events.SignalEvent(event)
}

f.logger.Info(
"Added to batch block and tx data for range",
zap.Uint64("from", slot.chunkRange.from),
zap.Uint64("to", slot.chunkRange.to),
)

// Save the latest height data
if err := wb.SetLatestHeight(slot.chunkRange.to); err != nil {
if rErr := wb.Rollback(); rErr != nil {
return fmt.Errorf("unable to save latest height info, %w, %w", err, rErr)
}

return fmt.Errorf("unable to save latest height info, %w", err)
}

if err := wb.Commit(); err != nil {
return fmt.Errorf("error persisting block information into storage, %w", err)
}

return nil
}
4 changes: 4 additions & 0 deletions fetch/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (m *mockClient) GetBlock(blockNum uint64) (*core_types.ResultBlock, error)
return nil, nil
}

func (m *mockClient) GetGenesisBlock() (*core_types.ResultGenesis, error) {
return nil, nil
}

func (m *mockClient) GetBlockResults(blockNum uint64) (*core_types.ResultBlockResults, error) {
if m.getBlockResultsFn != nil {
return m.getBlockResultsFn(blockNum)
Expand Down
3 changes: 3 additions & 0 deletions fetch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type Client interface {
// GetBlock returns specified block
GetBlock(uint64) (*core_types.ResultBlock, error)

// GetGenesisBlock returns the genesis block
GetGenesisBlock() (*core_types.ResultGenesis, error)

// GetBlockResults returns the results of executing the transactions
// for the specified block
GetBlockResults(uint64) (*core_types.ResultBlockResults, error)
Expand Down

0 comments on commit 19c991f

Please sign in to comment.