Skip to content

Commit

Permalink
enable firehose eth block ingestor
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Nov 28, 2022
1 parent 1be8557 commit bf2f3a5
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 11 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 43 additions & 1 deletion chain/ethereum/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod pbcodec;

use anyhow::format_err;
use graph::{
blockchain::{Block as BlockchainBlock, BlockPtr},
blockchain::{Block as BlockchainBlock, BlockPtr, ChainStoreBlock, ChainStoreData},
prelude::{
web3,
web3::types::{Bytes, H160, H2048, H256, H64, U256, U64},
Expand Down Expand Up @@ -441,6 +441,17 @@ impl BlockchainBlock for Block {
fn parent_ptr(&self) -> Option<BlockPtr> {
self.parent_ptr()
}

fn data(&self) -> Result<jsonrpc_core::serde_json::Value, jsonrpc_core::serde_json::Error> {
let chain_store_data = ChainStoreData {
block: ChainStoreBlock::new(
self.header().timestamp.as_ref().unwrap().seconds,
jsonrpc_core::Value::Null,
),
};

jsonrpc_core::to_value(chain_store_data)
}
}

impl HeaderOnlyBlock {
Expand Down Expand Up @@ -468,3 +479,34 @@ impl BlockchainBlock for HeaderOnlyBlock {
self.header().parent_ptr()
}
}

#[cfg(test)]
mod test {
use graph::{blockchain::Block as _, prelude::chrono::Utc};
use prost_types::Timestamp;

use crate::codec::BlockHeader;

use super::Block;

#[test]
fn ensure_block_serialization() {
let now = Utc::now().timestamp();
let mut block = Block::default();
let mut header = BlockHeader::default();
header.timestamp = Some(Timestamp {
seconds: now,
nanos: 0,
});

block.header = Some(header);

let str_block = block.data().unwrap().to_string();

assert_eq!(
str_block,
// if you're confused when reading this, format needs {{ to escape {
format!(r#"{{"block":{{"data":null,"timestamp":"{}"}}}}"#, now)
);
}
}
44 changes: 44 additions & 0 deletions graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,55 @@ pub trait Block: Send + Sync {
}

/// The data that should be stored for this block in the `ChainStore`
/// TODO: Return ChainStoreData once it is available for all chains
fn data(&self) -> Result<serde_json::Value, serde_json::Error> {
Ok(serde_json::Value::Null)
}
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
/// This is the root data for the chain store. This stucture provides backwards
/// compatibility with existing data for ethereum.
pub struct ChainStoreData {
pub block: ChainStoreBlock,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
/// ChainStoreBlock is intended to standardize the information stored in the data
/// field of the ChainStore. All the chains should eventually return this type
/// on the data() implementation for block. This will ensure that any part of the
/// structured data can be relied upon for all chains.
pub struct ChainStoreBlock {
/// Unix timestamp (seconds since epoch), can be stored as hex or decimal.
timestamp: String,
data: serde_json::Value,
}

impl ChainStoreBlock {
pub fn new(unix_timestamp: i64, data: serde_json::Value) -> Self {
Self {
timestamp: unix_timestamp.to_string(),
data,
}
}

pub fn timestamp_str(&self) -> &str {
&self.timestamp
}

pub fn timestamp(&self) -> i64 {
let (rdx, i) = if self.timestamp.starts_with("0x") {
(16, 2)
} else {
(10, 0)
};

i64::from_str_radix(&self.timestamp[i..], rdx).unwrap_or(0)
}
}

#[async_trait]
// This is only `Debug` because some tests require that
pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
Expand Down
37 changes: 31 additions & 6 deletions node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap::Parser as _;
use ethereum::chain::{EthereumAdapterSelector, EthereumBlockRefetcher, EthereumStreamBuilder};
use ethereum::codec::Block;
use ethereum::{
BlockIngestor as EthereumBlockIngestor, EthereumAdapterTrait, EthereumNetworks, RuntimeAdapter,
};
Expand Down Expand Up @@ -373,13 +374,37 @@ async fn main() {
if !opt.disable_block_ingestor {
if ethereum_chains.len() > 0 {
let block_polling_interval = Duration::from_millis(opt.ethereum_polling_interval);
let (firehose, polling): (HashMap<_, _>, HashMap<_, _>) = ethereum_chains
.into_iter()
.partition(|(_, chain)| chain.is_firehose_supported());

start_block_ingestor(
&logger,
&logger_factory,
block_polling_interval,
ethereum_chains,
);
start_block_ingestor(&logger, &logger_factory, block_polling_interval, polling);

firehose_networks_by_kind
.get(&BlockchainKind::Ethereum)
.map(|eth_firehose_endpoints| {
start_firehose_block_ingestor::<_, Block>(
&logger,
&network_store,
firehose
.into_iter()
.map(|(name, chain)| {
let firehose_endpoints = eth_firehose_endpoints
.networks
.get(&name)
.expect(&format!("chain {} to have endpoints", name))
.clone();
(
name,
FirehoseChain {
chain,
firehose_endpoints,
},
)
})
.collect(),
)
});
}

start_firehose_block_ingestor::<_, ArweaveBlock>(
Expand Down
33 changes: 33 additions & 0 deletions store/postgres/tests/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,39 @@ fn parse_timestamp() {
})
}

#[test]
fn parse_timestamp_firehose() {
const EXPECTED_TS: u64 = 1657712166;

run_test(|store, _, _| async move {
use block_store::*;
// The test subgraph is at block 2. Since we don't ever delete
// the genesis block, the only block eligible for cleanup is BLOCK_ONE
// and the first retained block is block 2.
block_store::set_chain(
vec![
&*GENESIS_BLOCK,
&*BLOCK_ONE,
&*BLOCK_TWO,
&*BLOCK_THREE_TIMESTAMP_FIREHOSE,
],
NETWORK_NAME,
);
let chain_store = store
.block_store()
.chain_store(NETWORK_NAME)
.expect("fake chain store");

let (_network, number, timestamp) = chain_store
.block_number(&BLOCK_THREE_TIMESTAMP_FIREHOSE.block_hash())
.await
.expect("block_number to return correct number and timestamp")
.unwrap();
assert_eq!(number, 3);
assert_eq!(timestamp.unwrap(), EXPECTED_TS);
})
}

#[test]
/// checks if retrieving the timestamp from the data blob works.
/// on ethereum, the block has timestamp as U256 so it will always have a value
Expand Down
2 changes: 2 additions & 0 deletions store/test-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ graph-mock = { path = "../../mock" }
graph-node = { path = "../../node" }
graph = { path = "../../graph" }
graph-store-postgres = { path = "../postgres" }
graph-chain-ethereum= { path = "../../chain/ethereum" }
lazy_static = "1.1"
hex-literal = "0.3"
diesel = { version = "1.4.8", features = ["postgres", "serde_json", "numeric", "r2d2"] }
serde = "1.0"
prost-types = "0.10.1"
35 changes: 31 additions & 4 deletions store/test-store/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use lazy_static::lazy_static;

use graph::components::store::BlockStore;
use graph::{
blockchain::Block,
blockchain::Block as BlockchainBlock,
prelude::{
serde_json, web3::types::H256, web3::types::U256, BlockHash, BlockNumber, BlockPtr,
EthereumBlock, LightEthereumBlock,
},
};
use graph_chain_ethereum::codec::{Block, BlockHeader};
use prost_types::Timestamp;

lazy_static! {
// Genesis block
Expand All @@ -33,6 +35,7 @@ lazy_static! {
pub static ref BLOCK_THREE: FakeBlock = BLOCK_TWO.make_child("7347afe69254df06729e123610b00b8b11f15cfae3241f9366fb113aec07489c", None);
pub static ref BLOCK_THREE_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(3, "fa9ebe3f74de4c56908b49f5c4044e85825f7350f3fa08a19151de82a82a7313");
pub static ref BLOCK_THREE_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", Some(U256::from(1657712166)));
pub static ref BLOCK_THREE_TIMESTAMP_FIREHOSE: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986f", Some(U256::from(1657712166)));
// This block is special and serializes in a slightly different way, this is needed to simulate non-ethereum behaviour at the store level. If you're not sure
// what you are doing, don't use this block for other tests.
pub static ref BLOCK_THREE_NO_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", None);
Expand Down Expand Up @@ -96,9 +99,25 @@ impl FakeBlock {
transaction_receipts: Vec::new(),
}
}

pub fn as_firehose_block(&self) -> Block {
let mut block = Block::default();
block.hash = self.hash.clone().into_bytes();
block.number = self.number as u64;

let mut header = BlockHeader::default();
header.parent_hash = self.parent_hash.clone().into_bytes();
header.timestamp = self.timestamp.map(|ts| Timestamp {
seconds: i64::from_str_radix(&ts.to_string(), 10).unwrap(),
nanos: 0,
});
block.header = Some(header);

block
}
}

impl Block for FakeBlock {
impl BlockchainBlock for FakeBlock {
fn ptr(&self) -> BlockPtr {
self.block_ptr()
}
Expand All @@ -115,7 +134,12 @@ impl Block for FakeBlock {
}

fn data(&self) -> Result<serde_json::Value, serde_json::Error> {
let mut value: serde_json::Value = serde_json::to_value(self.as_ethereum_block())?;
let mut value: serde_json::Value = if self.eq(&BLOCK_THREE_TIMESTAMP_FIREHOSE) {
self.as_firehose_block().data().unwrap()
} else {
serde_json::to_value(self.as_ethereum_block())?
};

if !self.eq(&BLOCK_THREE_NO_TIMESTAMP) {
return Ok(value);
};
Expand Down Expand Up @@ -145,6 +169,9 @@ pub fn set_chain(chain: FakeBlockList, network: &str) {
.block_store()
.chain_store(network)
.unwrap();
let chain: Vec<&dyn Block> = chain.iter().map(|block| *block as &dyn Block).collect();
let chain: Vec<&dyn BlockchainBlock> = chain
.iter()
.map(|block| *block as &dyn BlockchainBlock)
.collect();
store.set_chain(&GENESIS_BLOCK.hash, chain);
}

0 comments on commit bf2f3a5

Please sign in to comment.