Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable firehose eth block ingestor #4204

Merged
merged 1 commit into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 67 additions & 1 deletion chain/ethereum/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod pbcodec;

use anyhow::format_err;
use graph::{
blockchain::{Block as BlockchainBlock, BlockPtr},
blockchain::{Block as BlockchainBlock, BlockPtr, ChainStoreBlock, ChainStoreData},
prelude::{
web3,
web3::types::{Bytes, H160, H2048, H256, H64, U256, U64},
Expand Down Expand Up @@ -441,6 +441,14 @@ impl BlockchainBlock for Block {
fn parent_ptr(&self) -> Option<BlockPtr> {
self.parent_ptr()
}

// This implementation provides the timestamp so that it works with block _meta's timestamp.
// However, the firehose types will not populate the transaction receipts so switching back
// from firehose ingestor to the firehose ingestor will prevent non final block from being
// processed using the block stored by firehose.
fn data(&self) -> Result<jsonrpc_core::serde_json::Value, jsonrpc_core::serde_json::Error> {
self.header().to_json()
}
}

impl HeaderOnlyBlock {
Expand All @@ -449,6 +457,25 @@ impl HeaderOnlyBlock {
}
}

impl Into<ChainStoreData> for &BlockHeader {
fn into(self) -> ChainStoreData {
ChainStoreData {
block: ChainStoreBlock::new(
self.timestamp.as_ref().unwrap().seconds,
jsonrpc_core::Value::Null,
),
}
}
}

impl BlockHeader {
fn to_json(&self) -> Result<jsonrpc_core::serde_json::Value, jsonrpc_core::serde_json::Error> {
let chain_store_data: ChainStoreData = self.into();

jsonrpc_core::to_value(chain_store_data)
}
}

impl<'a> From<&'a HeaderOnlyBlock> for BlockPtr {
fn from(b: &'a HeaderOnlyBlock) -> BlockPtr {
BlockPtr::from(b.header())
Expand All @@ -467,4 +494,43 @@ impl BlockchainBlock for HeaderOnlyBlock {
fn parent_ptr(&self) -> Option<BlockPtr> {
self.header().parent_ptr()
}

// This implementation provides the timestamp so that it works with block _meta's timestamp.
// However, the firehose types will not populate the transaction receipts so switching back
// from firehose ingestor to the firehose ingestor will prevent non final block from being
// processed using the block stored by firehose.
fn data(&self) -> Result<jsonrpc_core::serde_json::Value, jsonrpc_core::serde_json::Error> {
self.header().to_json()
}
}

#[cfg(test)]
mod test {
use graph::{blockchain::Block as _, prelude::chrono::Utc};
use prost_types::Timestamp;

use crate::codec::BlockHeader;

use super::Block;

#[test]
fn ensure_block_serialization() {
let now = Utc::now().timestamp();
let mut block = Block::default();
let mut header = BlockHeader::default();
header.timestamp = Some(Timestamp {
seconds: now,
nanos: 0,
});

block.header = Some(header);

let str_block = block.data().unwrap().to_string();

assert_eq!(
str_block,
// if you're confused when reading this, format needs {{ to escape {
format!(r#"{{"block":{{"data":null,"timestamp":"{}"}}}}"#, now)
);
}
}
44 changes: 44 additions & 0 deletions graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,55 @@ pub trait Block: Send + Sync {
}

/// The data that should be stored for this block in the `ChainStore`
/// TODO: Return ChainStoreData once it is available for all chains
fn data(&self) -> Result<serde_json::Value, serde_json::Error> {
Ok(serde_json::Value::Null)
}
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
/// This is the root data for the chain store. This stucture provides backwards
/// compatibility with existing data for ethereum.
pub struct ChainStoreData {
pub block: ChainStoreBlock,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
/// ChainStoreBlock is intended to standardize the information stored in the data
/// field of the ChainStore. All the chains should eventually return this type
/// on the data() implementation for block. This will ensure that any part of the
/// structured data can be relied upon for all chains.
pub struct ChainStoreBlock {
/// Unix timestamp (seconds since epoch), can be stored as hex or decimal.
timestamp: String,
data: serde_json::Value,
}

impl ChainStoreBlock {
pub fn new(unix_timestamp: i64, data: serde_json::Value) -> Self {
Self {
timestamp: unix_timestamp.to_string(),
data,
}
}

pub fn timestamp_str(&self) -> &str {
&self.timestamp
}

pub fn timestamp(&self) -> i64 {
let (rdx, i) = if self.timestamp.starts_with("0x") {
(16, 2)
} else {
(10, 0)
};

i64::from_str_radix(&self.timestamp[i..], rdx).unwrap_or(0)
}
}

#[async_trait]
// This is only `Debug` because some tests require that
pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
Expand Down
37 changes: 36 additions & 1 deletion node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap::Parser as _;
use ethereum::chain::{EthereumAdapterSelector, EthereumBlockRefetcher, EthereumStreamBuilder};
use ethereum::codec::HeaderOnlyBlock;
use ethereum::{
BlockIngestor as EthereumBlockIngestor, EthereumAdapterTrait, EthereumNetworks, RuntimeAdapter,
};
Expand Down Expand Up @@ -373,13 +374,47 @@ async fn main() {
if !opt.disable_block_ingestor {
if ethereum_chains.len() > 0 {
let block_polling_interval = Duration::from_millis(opt.ethereum_polling_interval);
// Each chain contains both the rpc and firehose endpoints so provided
// IS_FIREHOSE_PREFERRED is set to true, a chain will use firehose if it has
// endpoints set but chains are essentially guaranteed to use EITHER firehose or RPC
// but will never start both.
let (firehose_eth_chains, polling_eth_chains): (HashMap<_, _>, HashMap<_, _>) =
ethereum_chains
.into_iter()
.partition(|(_, chain)| chain.is_firehose_supported());

start_block_ingestor(
&logger,
&logger_factory,
block_polling_interval,
ethereum_chains,
polling_eth_chains,
);

firehose_networks_by_kind
.get(&BlockchainKind::Ethereum)
.map(|eth_firehose_endpoints| {
start_firehose_block_ingestor::<_, HeaderOnlyBlock>(
&logger,
&network_store,
firehose_eth_chains
.into_iter()
.map(|(name, chain)| {
let firehose_endpoints = eth_firehose_endpoints
.networks
.get(&name)
.expect(&format!("chain {} to have endpoints", name))
.clone();
(
name,
FirehoseChain {
chain,
firehose_endpoints,
},
)
})
.collect(),
)
});
}

start_firehose_block_ingestor::<_, ArweaveBlock>(
Expand Down
33 changes: 33 additions & 0 deletions store/postgres/tests/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,39 @@ fn parse_timestamp() {
})
}

#[test]
fn parse_timestamp_firehose() {
const EXPECTED_TS: u64 = 1657712166;

run_test(|store, _, _| async move {
use block_store::*;
// The test subgraph is at block 2. Since we don't ever delete
// the genesis block, the only block eligible for cleanup is BLOCK_ONE
// and the first retained block is block 2.
block_store::set_chain(
vec![
&*GENESIS_BLOCK,
&*BLOCK_ONE,
&*BLOCK_TWO,
&*BLOCK_THREE_TIMESTAMP_FIREHOSE,
],
NETWORK_NAME,
);
let chain_store = store
.block_store()
.chain_store(NETWORK_NAME)
.expect("fake chain store");

let (_network, number, timestamp) = chain_store
.block_number(&BLOCK_THREE_TIMESTAMP_FIREHOSE.block_hash())
.await
.expect("block_number to return correct number and timestamp")
.unwrap();
assert_eq!(number, 3);
assert_eq!(timestamp.unwrap(), EXPECTED_TS);
})
}

#[test]
/// checks if retrieving the timestamp from the data blob works.
/// on ethereum, the block has timestamp as U256 so it will always have a value
Expand Down
2 changes: 2 additions & 0 deletions store/test-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ graph-mock = { path = "../../mock" }
graph-node = { path = "../../node" }
graph = { path = "../../graph" }
graph-store-postgres = { path = "../postgres" }
graph-chain-ethereum= { path = "../../chain/ethereum" }
lazy_static = "1.1"
hex-literal = "0.3"
diesel = { version = "1.4.8", features = ["postgres", "serde_json", "numeric", "r2d2"] }
serde = "1.0"
prost-types = "0.10.1"
35 changes: 31 additions & 4 deletions store/test-store/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use lazy_static::lazy_static;

use graph::components::store::BlockStore;
use graph::{
blockchain::Block,
blockchain::Block as BlockchainBlock,
prelude::{
serde_json, web3::types::H256, web3::types::U256, BlockHash, BlockNumber, BlockPtr,
EthereumBlock, LightEthereumBlock,
},
};
use graph_chain_ethereum::codec::{Block, BlockHeader};
use prost_types::Timestamp;

lazy_static! {
// Genesis block
Expand All @@ -33,6 +35,7 @@ lazy_static! {
pub static ref BLOCK_THREE: FakeBlock = BLOCK_TWO.make_child("7347afe69254df06729e123610b00b8b11f15cfae3241f9366fb113aec07489c", None);
pub static ref BLOCK_THREE_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(3, "fa9ebe3f74de4c56908b49f5c4044e85825f7350f3fa08a19151de82a82a7313");
pub static ref BLOCK_THREE_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", Some(U256::from(1657712166)));
pub static ref BLOCK_THREE_TIMESTAMP_FIREHOSE: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986f", Some(U256::from(1657712166)));
// This block is special and serializes in a slightly different way, this is needed to simulate non-ethereum behaviour at the store level. If you're not sure
// what you are doing, don't use this block for other tests.
pub static ref BLOCK_THREE_NO_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", None);
Expand Down Expand Up @@ -96,9 +99,25 @@ impl FakeBlock {
transaction_receipts: Vec::new(),
}
}

pub fn as_firehose_block(&self) -> Block {
let mut block = Block::default();
block.hash = self.hash.clone().into_bytes();
block.number = self.number as u64;

let mut header = BlockHeader::default();
header.parent_hash = self.parent_hash.clone().into_bytes();
header.timestamp = self.timestamp.map(|ts| Timestamp {
seconds: i64::from_str_radix(&ts.to_string(), 10).unwrap(),
nanos: 0,
});
block.header = Some(header);

block
}
}

impl Block for FakeBlock {
impl BlockchainBlock for FakeBlock {
fn ptr(&self) -> BlockPtr {
self.block_ptr()
}
Expand All @@ -115,7 +134,12 @@ impl Block for FakeBlock {
}

fn data(&self) -> Result<serde_json::Value, serde_json::Error> {
let mut value: serde_json::Value = serde_json::to_value(self.as_ethereum_block())?;
let mut value: serde_json::Value = if self.eq(&BLOCK_THREE_TIMESTAMP_FIREHOSE) {
self.as_firehose_block().data().unwrap()
} else {
serde_json::to_value(self.as_ethereum_block())?
};

if !self.eq(&BLOCK_THREE_NO_TIMESTAMP) {
return Ok(value);
};
Expand Down Expand Up @@ -145,6 +169,9 @@ pub fn set_chain(chain: FakeBlockList, network: &str) {
.block_store()
.chain_store(network)
.unwrap();
let chain: Vec<&dyn Block> = chain.iter().map(|block| *block as &dyn Block).collect();
let chain: Vec<&dyn BlockchainBlock> = chain
.iter()
.map(|block| *block as &dyn BlockchainBlock)
.collect();
store.set_chain(&GENESIS_BLOCK.hash, chain);
}