Skip to content

Commit

Permalink
chain, core, graph, store: Filter call handlers from failed txns
Browse files Browse the repository at this point in the history
  • Loading branch information
tilacog committed Jul 19, 2021
1 parent 9e2e5e6 commit e37fec8
Show file tree
Hide file tree
Showing 18 changed files with 594 additions and 57 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ethabi = { git = "https://github.com/graphprotocol/ethabi.git", branch = "master

# We have a couple custom patches to web3.
web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "master" }
itertools = "0.10.0"

graph-runtime-wasm = { path = "../../runtime/wasm" }
graph-runtime-derive = { path = "../../runtime/derive" }
Expand Down
12 changes: 7 additions & 5 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::capabilities::NodeCapabilities;
use crate::{data_source::DataSource, Chain};

pub type EventSignature = H256;
pub type FunctionSelector = [u8; 4];

#[derive(Clone, Debug)]
pub struct EthereumContractCall {
Expand Down Expand Up @@ -268,7 +269,8 @@ impl EthereumLogFilter {
pub(crate) struct EthereumCallFilter {
// Each call filter has a map of filters keyed by address, each containing a tuple with
// start_block and the set of function signatures
pub contract_addresses_function_signatures: HashMap<Address, (BlockNumber, HashSet<[u8; 4]>)>,
pub contract_addresses_function_signatures:
HashMap<Address, (BlockNumber, HashSet<FunctionSelector>)>,
}

impl EthereumCallFilter {
Expand Down Expand Up @@ -353,12 +355,12 @@ impl EthereumCallFilter {
}
}

impl FromIterator<(BlockNumber, Address, [u8; 4])> for EthereumCallFilter {
impl FromIterator<(BlockNumber, Address, FunctionSelector)> for EthereumCallFilter {
fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = (BlockNumber, Address, [u8; 4])>,
I: IntoIterator<Item = (BlockNumber, Address, FunctionSelector)>,
{
let mut lookup: HashMap<Address, (BlockNumber, HashSet<[u8; 4]>)> = HashMap::new();
let mut lookup: HashMap<Address, (BlockNumber, HashSet<FunctionSelector>)> = HashMap::new();
iter.into_iter()
.for_each(|(start_block, address, function_signature)| {
if !lookup.contains_key(&address) {
Expand All @@ -385,7 +387,7 @@ impl From<EthereumBlockFilter> for EthereumCallFilter {
.contract_addresses
.into_iter()
.map(|(start_block_opt, address)| (address, (start_block_opt, HashSet::default())))
.collect::<HashMap<Address, (BlockNumber, HashSet<[u8; 4]>)>>(),
.collect::<HashMap<Address, (BlockNumber, HashSet<FunctionSelector>)>>(),
}
}
}
Expand Down
31 changes: 22 additions & 9 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::collections::HashSet;
use std::iter::FromIterator;
use std::sync::Arc;

use anyhow::{Context, Error};
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::prelude::{EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt};
use graph::prelude::{
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, StopwatchMetrics,
};
use graph::{
blockchain::{
block_stream::{
Expand All @@ -23,6 +21,9 @@ use graph::{
SubgraphStore,
},
};
use std::collections::HashSet;
use std::iter::FromIterator;
use std::sync::Arc;

use crate::data_source::DataSourceTemplate;
use crate::data_source::UnresolvedDataSourceTemplate;
Expand Down Expand Up @@ -140,6 +141,7 @@ impl Blockchain for Chain {
loc: &DeploymentLocator,
capabilities: &Self::NodeCapabilities,
unified_api_version: UnifiedMappingApiVersion,
stopwatch_metrics: StopwatchMetrics,
) -> Result<Arc<Self::TriggersAdapter>, Error> {
let eth_adapter = self.eth_adapters.cheapest_with(capabilities)?.clone();
let logger = self
Expand All @@ -152,8 +154,9 @@ impl Blockchain for Chain {
logger,
ethrpc_metrics,
eth_adapter,
stopwatch_metrics,
chain_store: self.chain_store.cheap_clone(),
_unified_api_version: unified_api_version,
unified_api_version,
};
Ok(Arc::new(adapter))
}
Expand All @@ -180,7 +183,12 @@ impl Blockchain for Chain {
let requirements = filter.node_capabilities();

let triggers_adapter = self
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
.triggers_adapter(
&deployment,
&requirements,
unified_api_version.clone(),
metrics.stopwatch.clone(),
)
.expect(&format!(
"no adapter for network {} with capabilities {}",
self.name, requirements
Expand Down Expand Up @@ -306,9 +314,10 @@ pub struct DummyDataSourceTemplate;
pub struct TriggersAdapter {
logger: Logger,
ethrpc_metrics: Arc<SubgraphEthRpcMetrics>,
stopwatch_metrics: StopwatchMetrics,
chain_store: Arc<dyn ChainStore>,
eth_adapter: Arc<EthereumAdapter>,
_unified_api_version: UnifiedMappingApiVersion,
unified_api_version: UnifiedMappingApiVersion,
}

#[async_trait]
Expand All @@ -324,9 +333,11 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
self.logger.clone(),
self.chain_store.clone(),
self.ethrpc_metrics.clone(),
self.stopwatch_metrics.clone(),
from,
to,
filter,
self.unified_api_version.clone(),
)
.await
}
Expand Down Expand Up @@ -354,9 +365,11 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
logger.clone(),
self.chain_store.clone(),
self.ethrpc_metrics.clone(),
self.stopwatch_metrics.clone(),
block_number,
block_number,
filter,
self.unified_api_version.clone(),
)
.await?;
assert!(blocks.len() == 1);
Expand All @@ -368,7 +381,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&filter.log,
&full_block.ethereum_block,
));
triggers.append(&mut parse_call_triggers(&filter.call, &full_block));
triggers.append(&mut parse_call_triggers(&filter.call, &full_block)?);
triggers.append(&mut parse_block_triggers(filter.block.clone(), &full_block));
Ok(BlockWithTriggers::new(block, triggers))
}
Expand Down
Loading

0 comments on commit e37fec8

Please sign in to comment.