diff --git a/client/http.go b/client/http.go index 561d29f8..959a13f7 100644 --- a/client/http.go +++ b/client/http.go @@ -48,6 +48,15 @@ func (c *Client) GetBlock(blockNum uint64) (*core_types.ResultBlock, error) { return block, nil } +func (c *Client) GetGenesisBlock() (*core_types.ResultGenesis, error) { + genesis, err := c.client.Genesis() + if err != nil { + return nil, fmt.Errorf("unable to get genesis block, %w", err) + } + + return genesis, nil +} + func (c *Client) GetBlockResults(blockNum uint64) (*core_types.ResultBlockResults, error) { bn := int64(blockNum) diff --git a/fetch/fetch.go b/fetch/fetch.go index 16024f78..f1d566df 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -11,6 +11,9 @@ import ( queue "github.com/madz-lab/insertion-queue" "go.uber.org/zap" + "github.com/gnolang/gno/gno.land/pkg/gnoland" + "github.com/gnolang/gno/tm2/pkg/amino" + bftTypes "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/tx-indexer/storage" storageErrors "github.com/gnolang/tx-indexer/storage/errors" "github.com/gnolang/tx-indexer/types" @@ -70,6 +73,10 @@ func New( // FetchChainData starts the fetching process that indexes // blockchain data func (f *Fetcher) FetchChainData(ctx context.Context) error { + if err := f.maybeFetchGenesis(); err != nil { + return fmt.Errorf("unable to index genesis block, %w", err) + } + collectorCh := make(chan *workerResponse, DefaultMaxSlots) // attemptRangeFetch compares local and remote state @@ -178,68 +185,137 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { // Pop the next chunk f.chunkBuffer.PopFront() - wb := f.storage.WriteBatch() + if err := f.processSlot(item); err != nil { + return fmt.Errorf("unable to process slot, %w", err) + } + } + } + } +} - // Save the fetched data - for blockIndex, block := range item.chunk.blocks { - if saveErr := wb.SetBlock(block); saveErr != nil { - // This is a design choice that really highlights the strain - // of keeping legacy testnets running. Current TM2 testnets - // have blocks / transactions that are no longer compatible - // with latest "master" changes for Amino, so these blocks / txs are ignored, - // as opposed to this error being a show-stopper for the fetcher - f.logger.Error("unable to save block", zap.String("err", saveErr.Error())) +func (f *Fetcher) maybeFetchGenesis() error { + // Check if genesis block has already been indexed (latest height is set) + _, err := f.storage.GetLatestHeight() + if err == nil { + return nil + } else if !errors.Is(err, storageErrors.ErrNotFound) { + return fmt.Errorf("unable to fetch latest block height, %w", err) + } - continue - } + // Fetch genesis + iGenesisBlock, err := f.client.GetGenesisBlock() + if err != nil { + return fmt.Errorf("unable to fetch genesis block, %w", err) + } - f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height)) + genesisState, ok := iGenesisBlock.Genesis.AppState.(gnoland.GnoGenesisState) + if !ok { + return errors.New("unable to cast genesis block to GnoGenesisState") + } - // Get block results - txResults := item.chunk.results[blockIndex] + // Convert genesis to normal block + bftTxs := make([]bftTypes.Tx, len(genesisState.Txs)) + for i, tx := range genesisState.Txs { + bftTxs[i], err = amino.Marshal(tx) + if err != nil { + return fmt.Errorf("unable to marshal tx, %w", err) + } + } - // Save the fetched transaction results - for _, txResult := range txResults { - if err := wb.SetTx(txResult); err != nil { - f.logger.Error("unable to save tx", zap.String("err", err.Error())) + block := &bftTypes.Block{ + Header: bftTypes.Header{ + AppHash: iGenesisBlock.Genesis.AppHash, + ChainID: iGenesisBlock.Genesis.ChainID, + Time: iGenesisBlock.Genesis.GenesisTime, + Height: 0, + NumTxs: int64(len(bftTxs)), + TotalTxs: int64(len(bftTxs)), + }, + Data: bftTypes.Data{ + Txs: bftTxs, + }, + } - continue - } + slot := &slot{ + chunk: &chunk{ + blocks: []*bftTypes.Block{block}, + results: make([][]*bftTypes.TxResult, 1), + }, + chunkRange: chunkRange{ + from: 0, // should be -1, but we're using 0 to avoid underflow + to: 0, + }, + } - f.logger.Debug( - "Added tx to batch", - zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())), - ) - } + if err := f.processSlot(slot); err != nil { + return fmt.Errorf("unable to process genesis slot, %w", err) + } - // Alert any listeners of a new saved block - event := &types.NewBlock{ - Block: block, - Results: txResults, - } + return nil +} - f.events.SignalEvent(event) - } +func (f *Fetcher) processSlot(slot *slot) error { + wb := f.storage.WriteBatch() - f.logger.Info( - "Added to batch block and tx data for range", - zap.Uint64("from", item.chunkRange.from), - zap.Uint64("to", item.chunkRange.to), - ) + // Save the fetched data + for blockIndex, block := range slot.chunk.blocks { + if saveErr := wb.SetBlock(block); saveErr != nil { + // This is a design choice that really highlights the strain + // of keeping legacy testnets running. Current TM2 testnets + // have blocks / transactions that are no longer compatible + // with latest "master" changes for Amino, so these blocks / txs are ignored, + // as opposed to this error being a show-stopper for the fetcher + f.logger.Error("unable to save block", zap.String("err", saveErr.Error())) - // Save the latest height data - if err := wb.SetLatestHeight(item.chunkRange.to); err != nil { - if rErr := wb.Rollback(); rErr != nil { - return fmt.Errorf("unable to save latest height info, %w, %w", err, rErr) - } + continue + } - return fmt.Errorf("unable to save latest height info, %w", err) - } + f.logger.Debug("Added block data to batch", zap.Int64("number", block.Height)) - if err := wb.Commit(); err != nil { - return fmt.Errorf("error persisting block information into storage, %w", err) - } + // Get block results + txResults := slot.chunk.results[blockIndex] + + // Save the fetched transaction results + for _, txResult := range txResults { + if err := wb.SetTx(txResult); err != nil { + f.logger.Error("unable to save tx", zap.String("err", err.Error())) + + continue } + + f.logger.Debug( + "Added tx to batch", + zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())), + ) + } + + // Alert any listeners of a new saved block + event := &types.NewBlock{ + Block: block, + Results: txResults, } + + f.events.SignalEvent(event) } + + f.logger.Info( + "Added to batch block and tx data for range", + zap.Uint64("from", slot.chunkRange.from), + zap.Uint64("to", slot.chunkRange.to), + ) + + // Save the latest height data + if err := wb.SetLatestHeight(slot.chunkRange.to); err != nil { + if rErr := wb.Rollback(); rErr != nil { + return fmt.Errorf("unable to save latest height info, %w, %w", err, rErr) + } + + return fmt.Errorf("unable to save latest height info, %w", err) + } + + if err := wb.Commit(); err != nil { + return fmt.Errorf("error persisting block information into storage, %w", err) + } + + return nil } diff --git a/fetch/mocks_test.go b/fetch/mocks_test.go index 91f9a1e9..99edb878 100644 --- a/fetch/mocks_test.go +++ b/fetch/mocks_test.go @@ -39,6 +39,10 @@ func (m *mockClient) GetBlock(blockNum uint64) (*core_types.ResultBlock, error) return nil, nil } +func (m *mockClient) GetGenesisBlock() (*core_types.ResultGenesis, error) { + return nil, nil +} + func (m *mockClient) GetBlockResults(blockNum uint64) (*core_types.ResultBlockResults, error) { if m.getBlockResultsFn != nil { return m.getBlockResultsFn(blockNum) diff --git a/fetch/types.go b/fetch/types.go index cb75e844..5f585105 100644 --- a/fetch/types.go +++ b/fetch/types.go @@ -15,6 +15,9 @@ type Client interface { // GetBlock returns specified block GetBlock(uint64) (*core_types.ResultBlock, error) + // GetGenesisBlock returns the genesis block + GetGenesisBlock() (*core_types.ResultGenesis, error) + // GetBlockResults returns the results of executing the transactions // for the specified block GetBlockResults(uint64) (*core_types.ResultBlockResults, error)