diff --git a/Cargo.lock b/Cargo.lock index cf45e2f48e4..cd4739d3538 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1644,6 +1644,7 @@ dependencies = [ "graph-store-postgres", "hex", "http 0.1.21", + "itertools", "jsonrpc-core", "lazy_static", "mockall 0.9.1", diff --git a/chain/ethereum/Cargo.toml b/chain/ethereum/Cargo.toml index 9fac3b107d0..ab912f1d864 100644 --- a/chain/ethereum/Cargo.toml +++ b/chain/ethereum/Cargo.toml @@ -28,6 +28,7 @@ ethabi = { git = "https://github.com/graphprotocol/ethabi.git", branch = "master # We have a couple custom patches to web3. web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "master" } +itertools = "0.10.0" graph-runtime-wasm = { path = "../../runtime/wasm" } graph-runtime-derive = { path = "../../runtime/derive" } diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index f7a933d4a51..650b3852577 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -22,6 +22,7 @@ use crate::capabilities::NodeCapabilities; use crate::{data_source::DataSource, Chain}; pub type EventSignature = H256; +pub type FunctionSelector = [u8; 4]; #[derive(Clone, Debug)] pub struct EthereumContractCall { @@ -268,7 +269,8 @@ impl EthereumLogFilter { pub(crate) struct EthereumCallFilter { // Each call filter has a map of filters keyed by address, each containing a tuple with // start_block and the set of function signatures - pub contract_addresses_function_signatures: HashMap)>, + pub contract_addresses_function_signatures: + HashMap)>, } impl EthereumCallFilter { @@ -353,12 +355,12 @@ impl EthereumCallFilter { } } -impl FromIterator<(BlockNumber, Address, [u8; 4])> for EthereumCallFilter { +impl FromIterator<(BlockNumber, Address, FunctionSelector)> for EthereumCallFilter { fn from_iter(iter: I) -> Self where - I: IntoIterator, + I: IntoIterator, { - let mut lookup: HashMap)> = HashMap::new(); + let mut lookup: HashMap)> = HashMap::new(); iter.into_iter() .for_each(|(start_block, address, function_signature)| { if !lookup.contains_key(&address) { @@ -385,7 +387,7 @@ impl From for EthereumCallFilter { .contract_addresses .into_iter() .map(|(start_block_opt, address)| (address, (start_block_opt, HashSet::default()))) - .collect::)>>(), + .collect::)>>(), } } } diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 10d4d6f9170..c265f19a2d9 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -1,10 +1,8 @@ -use std::collections::HashSet; -use std::iter::FromIterator; -use std::sync::Arc; - use anyhow::{Context, Error}; use graph::data::subgraph::UnifiedMappingApiVersion; -use graph::prelude::{EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt}; +use graph::prelude::{ + EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, StopwatchMetrics, +}; use graph::{ blockchain::{ block_stream::{ @@ -23,6 +21,9 @@ use graph::{ SubgraphStore, }, }; +use std::collections::HashSet; +use std::iter::FromIterator; +use std::sync::Arc; use crate::data_source::DataSourceTemplate; use crate::data_source::UnresolvedDataSourceTemplate; @@ -140,6 +141,7 @@ impl Blockchain for Chain { loc: &DeploymentLocator, capabilities: &Self::NodeCapabilities, unified_api_version: UnifiedMappingApiVersion, + stopwatch_metrics: StopwatchMetrics, ) -> Result, Error> { let eth_adapter = self.eth_adapters.cheapest_with(capabilities)?.clone(); let logger = self @@ -152,8 +154,9 @@ impl Blockchain for Chain { logger, ethrpc_metrics, eth_adapter, + stopwatch_metrics, chain_store: self.chain_store.cheap_clone(), - _unified_api_version: unified_api_version, + unified_api_version, }; Ok(Arc::new(adapter)) } @@ -180,7 +183,12 @@ impl Blockchain for Chain { let requirements = filter.node_capabilities(); let triggers_adapter = self - .triggers_adapter(&deployment, &requirements, unified_api_version.clone()) + .triggers_adapter( + &deployment, + &requirements, + unified_api_version.clone(), + metrics.stopwatch.clone(), + ) .expect(&format!( "no adapter for network {} with capabilities {}", self.name, requirements @@ -306,9 +314,10 @@ pub struct DummyDataSourceTemplate; pub struct TriggersAdapter { logger: Logger, ethrpc_metrics: Arc, + stopwatch_metrics: StopwatchMetrics, chain_store: Arc, eth_adapter: Arc, - _unified_api_version: UnifiedMappingApiVersion, + unified_api_version: UnifiedMappingApiVersion, } #[async_trait] @@ -324,9 +333,11 @@ impl TriggersAdapterTrait for TriggersAdapter { self.logger.clone(), self.chain_store.clone(), self.ethrpc_metrics.clone(), + self.stopwatch_metrics.clone(), from, to, filter, + self.unified_api_version.clone(), ) .await } @@ -354,9 +365,11 @@ impl TriggersAdapterTrait for TriggersAdapter { logger.clone(), self.chain_store.clone(), self.ethrpc_metrics.clone(), + self.stopwatch_metrics.clone(), block_number, block_number, filter, + self.unified_api_version.clone(), ) .await?; assert!(blocks.len() == 1); @@ -368,7 +381,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &filter.log, &full_block.ethereum_block, )); - triggers.append(&mut parse_call_triggers(&filter.call, &full_block)); + triggers.append(&mut parse_call_triggers(&filter.call, &full_block)?); triggers.append(&mut parse_block_triggers(filter.block.clone(), &full_block)); Ok(BlockWithTriggers::new(block, triggers)) } diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index f7a94a39099..a2e72fd627c 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1,25 +1,22 @@ +use ethabi::ParamType; use ethabi::Token; use futures::future; use futures::prelude::*; -use lazy_static::lazy_static; -use std::collections::{HashMap, HashSet}; -use std::convert::TryFrom; -use std::iter::FromIterator; -use std::sync::Arc; -use std::time::Instant; - -use ethabi::ParamType; +use graph::components::transaction_receipt::LightTransactionReceipt; +use graph::data::subgraph::UnifiedMappingApiVersion; +use graph::prelude::StopwatchMetrics; use graph::{ blockchain::{block_stream::BlockWithTriggers, BlockPtr, IngestorError}, prelude::{ - anyhow, async_trait, debug, error, ethabi, + anyhow::{self, anyhow, bail}, + async_trait, debug, error, ethabi, futures03::{self, compat::Future01CompatExt, FutureExt, StreamExt, TryStreamExt}, - hex, retry, stream, tiny_keccak, trace, warn, + hex, info, retry, stream, tiny_keccak, trace, warn, web3::{ self, types::{ Address, Block, BlockId, BlockNumber as Web3BlockNumber, Bytes, CallRequest, - FilterBuilder, Log, H256, + Filter, FilterBuilder, Log, Transaction, TransactionReceipt, H256, }, }, BlockNumber, ChainStore, CheapClone, DynTryFuture, Error, EthereumCallCache, Logger, @@ -30,9 +27,15 @@ use graph::{ components::ethereum::*, prelude::web3::types::{Trace, TraceFilter, TraceFilterBuilder, H160}, }; +use itertools::Itertools; +use lazy_static::lazy_static; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::convert::TryFrom; +use std::iter::FromIterator; +use std::sync::Arc; +use std::time::Instant; use web3::api::Web3; use web3::transports::batch::Batch; -use web3::types::Filter; use crate::chain::BlockFinality; use crate::{ @@ -1378,9 +1381,11 @@ pub(crate) async fn blocks_with_triggers( logger: Logger, chain_store: Arc, subgraph_metrics: Arc, + stopwatch_metrics: StopwatchMetrics, from: BlockNumber, to: BlockNumber, filter: &TriggerFilter, + unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { // Each trigger filter needs to be queried for the same block range // and the blocks yielded need to be deduped. If any error occurs @@ -1484,8 +1489,8 @@ pub(crate) async fn blocks_with_triggers( block_hashes.insert(to_hash); triggers_by_block.entry(to).or_insert(Vec::new()); - let mut blocks = adapter - .load_blocks(logger1, chain_store, block_hashes) + let blocks = adapter + .load_blocks(logger1, chain_store.clone(), block_hashes) .and_then( move |block| match triggers_by_block.remove(&(block.number() as BlockNumber)) { Some(triggers) => Ok(BlockWithTriggers::new( @@ -1502,6 +1507,23 @@ pub(crate) async fn blocks_with_triggers( .compat() .await?; + // Filter out call triggers that come from unsuccessful transactions + + let mut blocks = if unified_api_version + .equal_or_greater_than(&graph::data::subgraph::API_VERSION_0_0_5) + { + let section = + stopwatch_metrics.start_section("filter_call_triggers_from_unsuccessful_transactions"); + let futures = blocks.into_iter().map(|block| { + filter_call_triggers_from_unsuccessful_transactions(block, ð, &chain_store, &logger) + }); + let blocks = futures03::future::try_join_all(futures).await?; + section.end(); + blocks + } else { + blocks + }; + blocks.sort_by_key(|block| block.ptr().number); // Sanity check that the returned blocks are in the correct range. @@ -1587,14 +1609,21 @@ pub(crate) fn parse_log_triggers( pub(crate) fn parse_call_triggers( call_filter: &EthereumCallFilter, block: &EthereumBlockWithCalls, -) -> Vec { +) -> anyhow::Result> { match &block.calls { Some(calls) => calls .iter() .filter(move |call| call_filter.matches(call)) - .map(move |call| EthereumTrigger::Call(Arc::new(call.clone()))) + .map( + move |call| match block.transaction_for_call_succeeded(call) { + Ok(true) => Ok(Some(EthereumTrigger::Call(Arc::new(call.clone())))), + Ok(false) => Ok(None), + Err(e) => Err(e), + }, + ) + .filter_map_ok(|some_trigger| some_trigger) .collect(), - None => vec![], + None => Ok(vec![]), } } @@ -1627,3 +1656,156 @@ pub(crate) fn parse_block_triggers( } triggers } + +async fn fetch_receipt_from_ethereum_client( + eth: &EthereumAdapter, + transaction_hash: &H256, +) -> anyhow::Result { + match eth + .web3 + .eth() + .transaction_receipt(*transaction_hash) + .compat() + .await + { + Ok(Some(receipt)) => Ok(receipt), + Ok(None) => bail!("Could not find transaction receipt"), + Err(error) => bail!("Failed to fetch transaction receipt: {}", error), + } +} + +async fn filter_call_triggers_from_unsuccessful_transactions( + mut block: BlockWithTriggers, + eth: &EthereumAdapter, + chain_store: &Arc, + logger: &Logger, +) -> anyhow::Result> { + // Return early if there is no trigger data + if block.trigger_data.is_empty() { + return Ok(block); + } + + let initial_number_of_triggers = block.trigger_data.len(); + + // Get the transaction hash from each call trigger + let transaction_hashes: BTreeSet = block + .trigger_data + .iter() + .filter_map(|trigger| match trigger { + EthereumTrigger::Call(call_trigger) => Some(call_trigger.transaction_hash), + _ => None, + }) + .collect::>>() + .ok_or(anyhow!( + "failed to obtain transaction hash from call triggers" + ))?; + + // And obtain all Transaction values for the calls in this block. + let transactions: Vec<&Transaction> = { + match &block.block { + BlockFinality::Final(ref block) => block + .transactions + .iter() + .filter(|transaction| transaction_hashes.contains(&transaction.hash)) + .collect(), + BlockFinality::NonFinal(_block_with_calls) => { + unreachable!( + "this function should not be called when dealing with non-final blocks" + ) + } + } + }; + + // Confidence check: Did we collect all transactions for the current call triggers? + if transactions.len() != transaction_hashes.len() { + bail!("failed to find transactions in block for the given call triggers") + } + + // Return early if there are no transactions to inspect + if transactions.is_empty() { + return Ok(block); + } + + // We'll also need the receipts for those transactions. In this step we collect all receipts + // we have in store for the current block. + let mut receipts = chain_store + .transaction_receipts_in_block(&block.ptr().hash_as_h256()) + .await? + .into_iter() + .map(|receipt| (receipt.transaction_hash.clone(), receipt)) + .collect::>(); + + // Do we have a receipt for each transaction under analysis? + let mut receipts_and_transactions: Vec<(&Transaction, LightTransactionReceipt)> = Vec::new(); + let mut transactions_without_receipt: Vec<&Transaction> = Vec::new(); + for transaction in transactions.iter() { + if let Some(receipt) = receipts.remove(&transaction.hash) { + receipts_and_transactions.push((transaction, receipt)); + } else { + transactions_without_receipt.push(transaction); + } + } + + // When some receipts are missing, we then try to fetch them from our client. + let futures = transactions_without_receipt + .iter() + .map(|transaction| async move { + fetch_receipt_from_ethereum_client(ð, &transaction.hash) + .await + .map(|receipt| (transaction, receipt)) + }); + futures03::future::try_join_all(futures) + .await? + .into_iter() + .for_each(|(transaction, receipt)| { + receipts_and_transactions.push((transaction, receipt.into())) + }); + + // TODO: We should persist those fresh transaction receipts into the store, so we don't incur + // additional Ethereum API calls for future scans on this block. + + // With all transactions and receipts in hand, we can evaluate the success of each transaction + let mut transaction_success: BTreeMap<&H256, bool> = BTreeMap::new(); + for (transaction, receipt) in receipts_and_transactions.into_iter() { + transaction_success.insert( + &transaction.hash, + evaluate_transaction_status(receipt.status), + ); + } + + // Confidence check: Did we inspect the status of all transactions? + if !transaction_hashes + .iter() + .all(|tx| transaction_success.contains_key(tx)) + { + bail!("Not all transactions status were inspected") + } + + // Filter call triggers from unsuccessful transactions + block.trigger_data.retain(|trigger| { + if let EthereumTrigger::Call(call_trigger) = trigger { + // Unwrap: We already checked that those values exist + transaction_success[&call_trigger.transaction_hash.unwrap()] + } else { + // We are not filtering other types of triggers + true + } + }); + + // Log if any call trigger was filtered out + let final_number_of_triggers = block.trigger_data.len(); + let number_of_filtered_triggers = initial_number_of_triggers - final_number_of_triggers; + if number_of_filtered_triggers != 0 { + let noun = { + if number_of_filtered_triggers == 1 { + "call trigger" + } else { + "call triggers" + } + }; + info!(&logger, + "Filtered {} {} from failed transactions", number_of_filtered_triggers, noun ; + "block_number" => block.ptr().block_number()); + } + Ok(block) +} diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 850ec39f30c..7b5c28fb488 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -341,13 +341,6 @@ where .with_context(|| format!("no chain configured for network {}", network))? .clone(); - let unified_mapping_api_version = manifest.unified_mapping_api_version()?; - let triggers_adapter = chain.triggers_adapter(&deployment, &required_capabilities, unified_mapping_api_version).map_err(|e| - anyhow!( - "expected triggers adapter that matches deployment {} with required capabilities: {}: {}", - &deployment, - &required_capabilities, e))?.clone(); - // Obtain filters from the manifest let filter = C::TriggerFilter::from_data_sources(manifest.data_sources.iter()); let start_blocks = manifest.start_blocks(); @@ -358,6 +351,14 @@ where // ownership of the manifest and host builder into the new instance let stopwatch_metrics = StopwatchMetrics::new(logger.clone(), deployment.hash.clone(), registry.clone()); + + let unified_mapping_api_version = manifest.unified_mapping_api_version()?; + let triggers_adapter = chain.triggers_adapter(&deployment, &required_capabilities, unified_mapping_api_version ,stopwatch_metrics.clone()).map_err(|e| + anyhow!( + "expected triggers adapter that matches deployment {} with required capabilities: {}: {}", + &deployment, + &required_capabilities, e))?.clone(); + let subgraph_metrics = Arc::new(SubgraphInstanceMetrics::new( registry.clone(), deployment.hash.as_str(), @@ -374,6 +375,7 @@ where manifest.network_name(), stopwatch_metrics, )); + // Initialize deployment_head with current deployment head. Any sort of trouble in // getting the deployment head ptr leads to initializing with 0 let deployment_head = store diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index f28cd0fec0f..b86403b271c 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -9,7 +9,10 @@ mod types; // Try to reexport most of the necessary types use crate::{ cheap_clone::CheapClone, - components::store::{DeploymentLocator, StoredDynamicDataSource}, + components::{ + metrics::stopwatch::StopwatchMetrics, + store::{DeploymentLocator, StoredDynamicDataSource}, + }, data::subgraph::{Mapping, Source, UnifiedMappingApiVersion}, prelude::DataSourceContext, runtime::{AscHeap, AscPtr, DeterministicHostError, HostExportError}, @@ -91,6 +94,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + 'static { loc: &DeploymentLocator, capabilities: &Self::NodeCapabilities, unified_api_version: UnifiedMappingApiVersion, + stopwatch_metrics: StopwatchMetrics, ) -> Result, Error>; fn new_block_stream( diff --git a/graph/src/components/ethereum/mod.rs b/graph/src/components/ethereum/mod.rs index 2f24d4d718c..697c4f60637 100644 --- a/graph/src/components/ethereum/mod.rs +++ b/graph/src/components/ethereum/mod.rs @@ -3,7 +3,8 @@ mod types; use web3::types::H256; pub use self::types::{ - EthereumBlock, EthereumBlockWithCalls, EthereumCall, LightEthereumBlock, LightEthereumBlockExt, + evaluate_transaction_status, EthereumBlock, EthereumBlockWithCalls, EthereumCall, + LightEthereumBlock, LightEthereumBlockExt, }; #[derive(Clone, Debug, PartialEq, Eq, Hash)] diff --git a/graph/src/components/ethereum/types.rs b/graph/src/components/ethereum/types.rs index 19aa32e2bcb..a763825d88b 100644 --- a/graph/src/components/ethereum/types.rs +++ b/graph/src/components/ethereum/types.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use std::{convert::TryFrom, sync::Arc}; use web3::types::{ Action, Address, Block, Bytes, Log, Res, Trace, Transaction, TransactionReceipt, H256, U256, + U64, }; use crate::{ @@ -67,6 +68,36 @@ pub struct EthereumBlockWithCalls { pub calls: Option>, } +impl EthereumBlockWithCalls { + /// Given an `EthereumCall`, check within receipts if that transaction was successful. + pub fn transaction_for_call_succeeded(&self, call: &EthereumCall) -> anyhow::Result { + let call_transaction_hash = call.transaction_hash.ok_or(anyhow::anyhow!( + "failed to find a transaction for this call" + ))?; + + let receipt = self + .ethereum_block + .transaction_receipts + .iter() + .find(|txn| txn.transaction_hash == call_transaction_hash) + .ok_or(anyhow::anyhow!( + "failed to find the receipt for this transaction" + ))?; + + Ok(evaluate_transaction_status(receipt.status)) + } +} + +/// Evaluates if a given transaction was successful. +/// +/// Returns `true` on success and `false` on failure. +/// If a receipt does not have a status value (EIP-658), assume the transaction was successful. +pub fn evaluate_transaction_status(receipt_status: Option) -> bool { + receipt_status + .map(|status| !status.is_zero()) + .unwrap_or(true) +} + #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct EthereumBlock { pub block: Arc, diff --git a/graph/src/components/mod.rs b/graph/src/components/mod.rs index 5cad3ad93bf..157bfa06680 100644 --- a/graph/src/components/mod.rs +++ b/graph/src/components/mod.rs @@ -76,3 +76,5 @@ pub trait EventProducer { /// Avoid calling directly, prefer helpers such as `forward`. fn take_event_stream(&mut self) -> Option + Send>>; } + +pub mod transaction_receipt; diff --git a/graph/src/components/store.rs b/graph/src/components/store.rs index 5d66ed10ab4..b9029d427e6 100644 --- a/graph/src/components/store.rs +++ b/graph/src/components/store.rs @@ -19,6 +19,8 @@ use thiserror::Error; use web3::types::{Address, H256}; use crate::blockchain::Blockchain; +use crate::components::server::index_node::VersionInfo; +use crate::components::transaction_receipt; use crate::data::subgraph::status; use crate::data::{store::*, subgraph::Source}; use crate::prelude::*; @@ -28,8 +30,6 @@ use crate::{ data::{query::QueryTarget, subgraph::schema::*}, }; -use crate::components::server::index_node::VersionInfo; - lazy_static! { pub static ref SUBSCRIPTION_THROTTLE_INTERVAL: Duration = env::var("SUBSCRIPTION_THROTTLE_INTERVAL") @@ -1298,6 +1298,12 @@ pub trait ChainStore: Send + Sync + 'static { /// Find the block with `block_hash` and return the network name and number fn block_number(&self, block_hash: H256) -> Result, StoreError>; + + /// Tries to retrieve all transactions receipts for a given block. + async fn transaction_receipts_in_block( + &self, + block_ptr: &H256, + ) -> Result, StoreError>; } pub trait EthereumCallCache: Send + Sync + 'static { diff --git a/graph/src/components/transaction_receipt.rs b/graph/src/components/transaction_receipt.rs new file mode 100644 index 00000000000..ce9e5eb3fa0 --- /dev/null +++ b/graph/src/components/transaction_receipt.rs @@ -0,0 +1,206 @@ +//! Code for retrieving transaction receipts from the database. +//! +//! This module exposes: +//! 1. the [`find_transaction_receipts_in_block`] function, that queries the database and returns +//! transaction receipts present in a given block. +//! 2. the [`LightTransactionReceipt`] type, which holds basic information about the retrieved +//! transaction receipts. + +use diesel::{ + pg::{Pg, PgConnection}, + prelude::*, + query_builder::{Query, QueryFragment}, + sql_types::{Binary, Nullable}, +}; +use diesel_derives::{Queryable, QueryableByName}; +use itertools::Itertools; +use std::convert::TryFrom; +use web3::types::*; + +/// Parameters for querying for all transaction receipts of a given block. +struct TransactionReceiptQuery<'a> { + block_hash: &'a [u8], + schema_name: &'a str, +} + +impl<'a> diesel::query_builder::QueryId for TransactionReceiptQuery<'a> { + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a> QueryFragment for TransactionReceiptQuery<'a> { + /// Writes the following SQL: + /// + /// ```sql + /// select + /// ethereum_hex_to_bytea(receipt ->> 'transactionHash') as transaction_hash, + /// ethereum_hex_to_bytea(receipt ->> 'transactionIndex') as transaction_index, + /// ethereum_hex_to_bytea(receipt ->> 'blockHash') as block_hash, + /// ethereum_hex_to_bytea(receipt ->> 'blockNumber') as block_number, + /// ethereum_hex_to_bytea(receipt ->> 'gasUsed') as gas_used, + /// ethereum_hex_to_bytea(receipt ->> 'status') as status + /// from ( + /// select + /// jsonb_array_elements(data -> 'transaction_receipts') as receipt + /// from + /// $CHAIN_SCHEMA.blocks + /// where hash = $BLOCK_HASH) as temp; + ///``` + fn walk_ast(&self, mut out: diesel::query_builder::AstPass) -> QueryResult<()> { + out.push_sql( + r#" +select + ethereum_hex_to_bytea(receipt ->> 'transactionHash') as transaction_hash, + ethereum_hex_to_bytea(receipt ->> 'transactionIndex') as transaction_index, + ethereum_hex_to_bytea(receipt ->> 'blockHash') as block_hash, + ethereum_hex_to_bytea(receipt ->> 'blockNumber') as block_number, + ethereum_hex_to_bytea(receipt ->> 'gasUsed') as gas_used, + ethereum_hex_to_bytea(receipt ->> 'status') as status +from ( + select jsonb_array_elements(data -> 'transaction_receipts') as receipt + from"#, + ); + out.push_identifier(&self.schema_name)?; + out.push_sql("."); + out.push_identifier("blocks")?; + out.push_sql(" where hash = "); + out.push_bind_param::(&self.block_hash)?; + out.push_sql(") as temp;"); + Ok(()) + } +} + +impl<'a> Query for TransactionReceiptQuery<'a> { + type SqlType = ( + Binary, + Binary, + Nullable, + Nullable, + Nullable, + Nullable, + ); +} + +impl<'a> RunQueryDsl for TransactionReceiptQuery<'a> {} + +/// Type that comes straight out of a SQL query +#[derive(QueryableByName, Queryable)] +struct RawTransactionReceipt { + #[sql_type = "Binary"] + transaction_hash: Vec, + #[sql_type = "Binary"] + transaction_index: Vec, + #[sql_type = "Nullable"] + block_hash: Option>, + #[sql_type = "Nullable"] + block_number: Option>, + #[sql_type = "Nullable"] + gas_used: Option>, + #[sql_type = "Nullable"] + status: Option>, +} + +/// Like web3::types::Receipt, but with fewer fields. +pub struct LightTransactionReceipt { + pub transaction_hash: H256, + pub transaction_index: U64, + pub block_hash: Option, + pub block_number: Option, + pub gas_used: Option, + pub status: Option, +} + +/// Converts Vec to [u8; N], where N is the vector's expected lenght. +/// Fails if input size is larger than output size. +pub(crate) fn drain_vector(input: Vec) -> Result<[u8; N], anyhow::Error> { + anyhow::ensure!(input.len() <= N, "source is larger than output"); + let mut output = [0u8; N]; + let start = output.len() - input.len(); + output[start..].iter_mut().set_from(input); + Ok(output) +} + +#[test] +fn test_drain_vector() { + let input = vec![191, 153, 17]; + let expected_output = [0, 0, 0, 0, 0, 191, 153, 17]; + let result = drain_vector(input).expect("failed to drain vector into array"); + assert_eq!(result, expected_output); +} + +impl TryFrom for LightTransactionReceipt { + type Error = anyhow::Error; + + fn try_from(value: RawTransactionReceipt) -> Result { + let RawTransactionReceipt { + transaction_hash, + transaction_index, + block_hash, + block_number, + gas_used, + status, + } = value; + + let transaction_hash = drain_vector(transaction_hash)?; + let transaction_index = drain_vector(transaction_index)?; + let block_hash = block_hash.map(drain_vector).transpose()?; + let block_number = block_number.map(drain_vector).transpose()?; + let gas_used = gas_used.map(drain_vector).transpose()?; + let status = status.map(drain_vector).transpose()?; + + Ok(LightTransactionReceipt { + transaction_hash: transaction_hash.into(), + transaction_index: transaction_index.into(), + block_hash: block_hash.map(Into::into), + block_number: block_number.map(Into::into), + gas_used: gas_used.map(Into::into), + status: status.map(Into::into), + }) + } +} + +/// Queries the database for all the transaction receipts in a given block range. +pub fn find_transaction_receipts_in_block( + conn: &PgConnection, + schema_name: &str, + block_hash: &H256, +) -> anyhow::Result> { + let query = TransactionReceiptQuery { + schema_name, + block_hash: block_hash.as_bytes(), + }; + + query + .get_results::(conn) + .or_else(|error| { + Err(anyhow::anyhow!( + "Error fetching transaction receipt from database: {}", + error + )) + })? + .into_iter() + .map(LightTransactionReceipt::try_from) + .collect() +} + +impl From for LightTransactionReceipt { + fn from(receipt: TransactionReceipt) -> Self { + let TransactionReceipt { + transaction_hash, + transaction_index, + block_hash, + block_number, + gas_used, + status, + .. + } = receipt; + LightTransactionReceipt { + transaction_hash, + transaction_index, + block_hash, + block_number, + gas_used, + status, + } + } +} diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 928bf96ac68..6981c104377 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -653,7 +653,7 @@ impl UnresolvedMapping { pub struct UnifiedMappingApiVersion(Option); impl UnifiedMappingApiVersion { - pub fn equal_or_greater_than(&self, other_version: &'static Version) -> bool { + pub fn equal_or_greater_than(&self, other_version: &Version) -> bool { assert!( other_version >= &API_VERSION_0_0_5, "api versions before 0.0.5 should not be used for comparison" diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 16a8beaa019..f20ee6a2a47 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -123,7 +123,7 @@ pub mod prelude { SubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar, SubgraphVersionSwitchingMode, }; - pub use crate::components::{EventConsumer, EventProducer}; + pub use crate::components::{transaction_receipt, EventConsumer, EventProducer}; pub use crate::cheap_clone::CheapClone; pub use crate::data::graphql::{ diff --git a/mock/src/store.rs b/mock/src/store.rs index 2f75f15d88e..3695739597d 100644 --- a/mock/src/store.rs +++ b/mock/src/store.rs @@ -42,6 +42,9 @@ mock! { fn confirm_block_hash(&self, number: BlockNumber, hash: &H256) -> Result; fn block_number(&self, block_hash: H256) -> Result, StoreError>; + + async fn transaction_receipts_in_block(&self, block_hash: &H256) -> Result, StoreError>; + } } diff --git a/store/postgres/migrations/2021-06-14-201635_add_ethereum_hex_to_bytea_function/down.sql b/store/postgres/migrations/2021-06-14-201635_add_ethereum_hex_to_bytea_function/down.sql new file mode 100644 index 00000000000..6f4151cef7e --- /dev/null +++ b/store/postgres/migrations/2021-06-14-201635_add_ethereum_hex_to_bytea_function/down.sql @@ -0,0 +1,2 @@ +drop function if exists ethereum_hex_to_bytea (text); +drop function if exists raise_exception_bytea (text); diff --git a/store/postgres/migrations/2021-06-14-201635_add_ethereum_hex_to_bytea_function/up.sql b/store/postgres/migrations/2021-06-14-201635_add_ethereum_hex_to_bytea_function/up.sql new file mode 100644 index 00000000000..fa1e6ef2e78 --- /dev/null +++ b/store/postgres/migrations/2021-06-14-201635_add_ethereum_hex_to_bytea_function/up.sql @@ -0,0 +1,47 @@ +/* +Since we deal with a lot of hex data encoded in a very specific string pattern [see link #1], we +need a function to convert that data into byte arrays (the "bytea" PostgreSQL type). Using byte +arrays is also useful because the former occupies double the space of the latter. + +The idea is to have a deliberately generic database function to send bytes over so we can re-encode +them into concrete types, such as U64 and H256 and booleans. + +Examples: +1. ethereum_hex_to_bytea(null) -> null +2. ethereum_hex_to_bytea("0x1") -> \x01 +3. ethereum_hex_to_bytea("0x0") -> \x00 +4. ethereum_hex_to_bytea("0xdeadbeef") -> \xdeadbeef +5. ethereum_hex_to_bytea("0x") -> ERROR: Can't decode an empty hexadecimal string. +6. ethereum_hex_to_bytea("") -> ERROR: Input must start with '0x'. + + +[1: https://openethereum.github.io/JSONRPC#types-in-the-jsonrpc] + */ +create or replace function raise_exception_bytea (text) + returns bytea + as $$ +begin + raise exception '%', $1; +end; +$$ +language plpgsql +volatile; + +create or replace function ethereum_hex_to_bytea (eth_hex text) + returns bytea + as $$ + select + case when $1 is null then + null + when not starts_with (eth_hex, '0x') then + raise_exception_bytea('Input must start with ''0x''.') + when length(eth_hex) = 2 then + raise_exception_bytea('Can''t decode an empty hexadecimal string.') + when length($1) % 2 = 0 then + decode(right ($1, -2), 'hex') + else + decode(replace(eth_hex, 'x', ''), 'hex') + end as return +$$ +language sql +immutable strict; diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index d25fe7f264a..5645535bcfa 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1,3 +1,9 @@ +use diesel::pg::PgConnection; +use diesel::prelude::*; +use diesel::r2d2::{ConnectionManager, PooledConnection}; +use diesel::sql_types::Text; +use diesel::{insert_into, update}; +use graph::prelude::web3::types::H256; use graph::{ constraint_violation, prelude::{ @@ -6,19 +12,17 @@ use graph::{ }, }; -use diesel::pg::PgConnection; -use diesel::r2d2::{ConnectionManager, PooledConnection}; -use diesel::sql_types::Text; -use diesel::{delete, prelude::*}; -use diesel::{insert_into, update}; - use graph::ensure; -use std::{collections::HashMap, convert::TryFrom, sync::Arc}; -use std::{convert::TryInto, iter::FromIterator}; +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, + iter::FromIterator, + sync::Arc, +}; use graph::prelude::{ - web3::types::H256, BlockNumber, BlockPtr, Error, EthereumBlock, EthereumNetworkIdentifier, - LightEthereumBlock, + transaction_receipt::LightTransactionReceipt, BlockNumber, BlockPtr, Error, EthereumBlock, + EthereumNetworkIdentifier, LightEthereumBlock, }; use crate::{ @@ -44,7 +48,6 @@ pub use data::Storage; /// Encapuslate access to the blocks table for a chain. mod data { - use graph::{constraint_violation, prelude::StoreError}; use diesel::{connection::SimpleConnection, insert_into}; use diesel::{delete, prelude::*, sql_query}; @@ -60,6 +63,13 @@ mod data { update, }; use diesel_dynamic_schema as dds; + use graph::{ + constraint_violation, + prelude::{ + transaction_receipt::{find_transaction_receipts_in_block, LightTransactionReceipt}, + StoreError, + }, + }; use std::fmt; use std::iter::FromIterator; @@ -311,9 +321,7 @@ mod data { Ok(Self::Private(Schema::new(s))) } - } - impl Storage { /// Create dedicated database tables for this chain if it uses /// `Storage::Private`. If it uses `Storage::Shared`, do nothing since /// a regular migration will already have created the `ethereum_blocks` @@ -1079,6 +1087,16 @@ mod data { .execute(conn) .unwrap(); } + + /// Delegates to [`transaction_receipt::find_transaction_receipts_in_block`]. + pub(crate) fn find_transaction_receipts_in_block( + &self, + conn: &PgConnection, + schema_name: &str, + block_hash: &H256, + ) -> anyhow::Result> { + find_transaction_receipts_in_block(conn, schema_name, block_hash) + } } } @@ -1144,6 +1162,7 @@ impl ChainStore { } pub(crate) fn drop_chain(&self) -> Result<(), Error> { + use diesel::dsl::delete; use public::ethereum_networks as n; let conn = self.get_conn()?; @@ -1413,6 +1432,21 @@ impl ChainStoreTrait for ChainStore { .block_number(&conn, hash)? .map(|number| (self.chain.clone(), number))) } + + async fn transaction_receipts_in_block( + &self, + block_hash: &H256, + ) -> Result, StoreError> { + let pool = self.pool.clone(); + let storage = self.storage.clone(); + let block_hash = block_hash.clone(); + pool.with_conn(move |conn, _| { + storage + .find_transaction_receipts_in_block(&conn, &storage.to_string(), &block_hash) + .map_err(|e| StoreError::from(e).into()) + }) + .await + } } impl EthereumCallCache for ChainStore {