From a8d184c87555967769e71c7a8c2bfb6f0e070b91 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 11 Nov 2022 15:09:36 +0000 Subject: [PATCH] fix(offchain): Enforce entities list on offchain data sources (#4147) --- core/src/subgraph/runner.rs | 6 +++ graph/src/data/subgraph/schema.rs | 2 +- graph/src/data_source/mod.rs | 47 ++++++++++++++++++- graph/src/data_source/offchain.rs | 6 +-- runtime/wasm/src/host_exports.rs | 24 +++++++++- store/postgres/src/subgraph_store.rs | 6 +++ .../file-data-sources/src/mapping.ts | 7 ++- .../file-data-sources/subgraph.yaml | 15 +++++- tests/src/fixture.rs | 41 +++++++++++----- tests/tests/runner.rs | 18 ++++++- 10 files changed, 151 insertions(+), 21 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index a8038777f5a..944fab8689f 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -683,6 +683,12 @@ where "Attempted to create data source in offchain data source handler. This is not yet supported.", ); + // This propagates any deterministic error as a non-deterministic one. Which might make + // sense considering offchain data sources are non-deterministic. + if let Some(err) = block_state.deterministic_errors.into_iter().next() { + return Err(anyhow!("{}", err.to_string())); + } + mods.extend(block_state.entity_cache.as_modifications()?.modifications); processed_data_sources.extend(block_state.processed_data_sources); } diff --git a/graph/src/data/subgraph/schema.rs b/graph/src/data/subgraph/schema.rs index 26b37b124f5..4d08d5d592a 100644 --- a/graph/src/data/subgraph/schema.rs +++ b/graph/src/data/subgraph/schema.rs @@ -215,7 +215,7 @@ impl SubgraphManifestEntity { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct SubgraphError { pub subgraph_id: DeploymentHash, pub message: String, diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index ccbc707101d..a2b5d5b97c2 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -13,7 +13,7 @@ use crate::{ }, components::{ link_resolver::LinkResolver, - store::{BlockNumber, StoredDynamicDataSource}, + store::{BlockNumber, EntityType, StoredDynamicDataSource}, }, data_source::offchain::OFFCHAIN_KINDS, prelude::{CheapClone as _, DataSourceContext}, @@ -42,6 +42,42 @@ pub enum DataSourceCreationError { Unknown(#[from] Error), } +/// Which entity types a data source can read and write to. +/// +/// Currently this is only enforced on offchain data sources and templates, based on the `entities` +/// key in the manifest. This informs which entity tables need an explicit `causality_region` column +/// and which will always have `causality_region == 0`. +/// +/// Note that this is just an optimization and not sufficient for causality region isolation, since +/// generally the causality region is a property of the entity, not of the entity type. +/// +/// See also: entity-type-access +pub enum EntityTypeAccess { + Any, + Restriced(Vec), +} + +impl fmt::Display for EntityTypeAccess { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match self { + Self::Any => write!(f, "Any"), + Self::Restriced(entities) => { + let strings = entities.iter().map(|e| e.as_str()).collect::>(); + write!(f, "{}", strings.join(", ")) + } + } + } +} + +impl EntityTypeAccess { + pub fn allows(&self, entity_type: &EntityType) -> bool { + match self { + Self::Any => true, + Self::Restriced(types) => types.contains(entity_type), + } + } +} + impl DataSource { pub fn as_onchain(&self) -> Option<&C::DataSource> { match self { @@ -106,6 +142,15 @@ impl DataSource { } } + pub fn entities(&self) -> EntityTypeAccess { + match self { + // Note: Onchain data sources have an `entities` field in the manifest, but it has never + // been enforced. + Self::Onchain(_) => EntityTypeAccess::Any, + Self::Offchain(ds) => EntityTypeAccess::Restriced(ds.mapping.entities.clone()), + } + } + pub fn match_and_decode( &self, trigger: &TriggerData, diff --git a/graph/src/data_source/offchain.rs b/graph/src/data_source/offchain.rs index 235124f20d0..b0ea8ddd365 100644 --- a/graph/src/data_source/offchain.rs +++ b/graph/src/data_source/offchain.rs @@ -3,7 +3,7 @@ use crate::{ blockchain::{BlockPtr, Blockchain}, components::{ link_resolver::LinkResolver, - store::{BlockNumber, StoredDynamicDataSource}, + store::{BlockNumber, EntityType, StoredDynamicDataSource}, subgraph::DataSourceTemplateInfo, }, data::store::scalar::Bytes, @@ -245,7 +245,7 @@ pub enum Source { pub struct Mapping { pub language: String, pub api_version: semver::Version, - pub entities: Vec, + pub entities: Vec, pub handler: String, pub runtime: Arc>, pub link: Link, @@ -271,7 +271,7 @@ pub struct UnresolvedMapping { pub language: String, pub file: Link, pub handler: String, - pub entities: Vec, + pub entities: Vec, } impl UnresolvedDataSource { diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index af507c6d9fa..a412260fd32 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -15,7 +15,7 @@ use graph::components::subgraph::{ PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing, }; use graph::data::store; -use graph::data_source::{DataSource, DataSourceTemplate}; +use graph::data_source::{DataSource, DataSourceTemplate, EntityTypeAccess}; use graph::ensure; use graph::prelude::ethabi::param_type::Reader; use graph::prelude::ethabi::{decode, encode, Token}; @@ -63,6 +63,7 @@ pub struct HostExports { data_source_address: Vec, subgraph_network: String, data_source_context: Arc>, + entity_type_access: EntityTypeAccess, /// Some data sources have indeterminism or different notions of time. These /// need to be each be stored separately to separate causality between them, @@ -89,6 +90,7 @@ impl HostExports { data_source_name: data_source.name().to_owned(), data_source_address: data_source.address().unwrap_or_default(), data_source_context: data_source.context().cheap_clone(), + entity_type_access: data_source.entities(), causality_region: PoICausalityRegion::from_network(&subgraph_network), subgraph_network, templates, @@ -97,6 +99,21 @@ impl HostExports { } } + /// Enfore the entity type access restrictions. See also: entity-type-access + fn check_entity_type_access(&self, entity_type: &EntityType) -> Result<(), HostExportError> { + match self.entity_type_access.allows(entity_type) { + true => Ok(()), + false => Err(HostExportError::Deterministic(anyhow!( + "entity type `{}` is not on the 'entities' list for data source `{}`. \ + Hint: Add `{}` to the 'entities' list, which currently is: `{}`.", + entity_type, + self.data_source_name, + entity_type, + self.entity_type_access + ))), + } + } + pub(crate) fn abort( &self, message: Option, @@ -139,7 +156,7 @@ impl HostExports { data: HashMap, stopwatch: &StopwatchMetrics, gas: &GasCounter, - ) -> Result<(), anyhow::Error> { + ) -> Result<(), HostExportError> { let poi_section = stopwatch.start_section("host_export_store_set__proof_of_indexing"); write_poi_event( proof_of_indexing, @@ -157,6 +174,7 @@ impl HostExports { entity_type: EntityType::new(entity_type), entity_id: entity_id.into(), }; + self.check_entity_type_access(&key.entity_type)?; gas.consume_host_fn(gas::STORE_SET.with_args(complexity::Linear, (&key, &data)))?; @@ -188,6 +206,7 @@ impl HostExports { entity_type: EntityType::new(entity_type), entity_id: entity_id.into(), }; + self.check_entity_type_access(&key.entity_type)?; gas.consume_host_fn(gas::STORE_REMOVE.with_args(complexity::Size, &key))?; @@ -207,6 +226,7 @@ impl HostExports { entity_type: EntityType::new(entity_type), entity_id: entity_id.into(), }; + self.check_entity_type_access(&store_key.entity_type)?; let result = state.entity_cache.get(&store_key)?; gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?; diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 5668ff3c606..44a0baf92e3 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -831,6 +831,12 @@ impl SubgraphStoreInner { Ok(()) } + #[cfg(debug_assertions)] + pub fn status_for_id(&self, id: graph::components::store::DeploymentId) -> status::Info { + let filter = status::Filter::DeploymentIds(vec![id]); + self.status(filter).unwrap().into_iter().next().unwrap() + } + pub(crate) fn status(&self, filter: status::Filter) -> Result, StoreError> { let sites = match filter { status::Filter::SubgraphName(name) => { diff --git a/tests/integration-tests/file-data-sources/src/mapping.ts b/tests/integration-tests/file-data-sources/src/mapping.ts index 8107b015ce9..f928b1ba419 100644 --- a/tests/integration-tests/file-data-sources/src/mapping.ts +++ b/tests/integration-tests/file-data-sources/src/mapping.ts @@ -1,4 +1,4 @@ -import { ethereum, dataSource, BigInt, Bytes } from '@graphprotocol/graph-ts' +import { ethereum, dataSource, BigInt, Bytes, DataSourceContext } from '@graphprotocol/graph-ts' import { IpfsFile, IpfsFile1 } from '../generated/schema' export function handleBlock(block: ethereum.Block): void { @@ -21,6 +21,11 @@ export function handleBlock(block: ethereum.Block): void { if (block.number == BigInt.fromI32(3)) { dataSource.create("File1", ["QmVkvoPGi9jvvuxsHDVJDgzPEzagBaWSZRYoRDzU244HjZ"]) } + + // Will fail the subgraph when processed due to mismatch in the entity type and 'entities'. + if (block.number == BigInt.fromI32(5)) { + dataSource.create("File2", ["QmVkvoPGi9jvvuxsHDVJDgzPEzagBaWSZRYoRDzU244HjZ"]) + } } export function handleFile(data: Bytes): void { diff --git a/tests/integration-tests/file-data-sources/subgraph.yaml b/tests/integration-tests/file-data-sources/subgraph.yaml index 41b37b8d63d..ee9d3aac689 100644 --- a/tests/integration-tests/file-data-sources/subgraph.yaml +++ b/tests/integration-tests/file-data-sources/subgraph.yaml @@ -28,7 +28,7 @@ templates: apiVersion: 0.0.7 language: wasm/assemblyscript entities: - - Gravatar + - IpfsFile abis: - name: Contract file: ./abis/Contract.abi @@ -46,4 +46,17 @@ templates: - name: Contract file: ./abis/Contract.abi handler: handleFile1 + file: ./src/mapping.ts + - kind: file/ipfs + name: File2 + mapping: + kind: ethereum/events + apiVersion: 0.0.7 + language: wasm/assemblyscript + entities: + - IpfsFile # will trigger an error, should be IpfsFile1 + abis: + - name: Contract + file: ./abis/Contract.abi + handler: handleFile1 file: ./src/mapping.ts \ No newline at end of file diff --git a/tests/src/fixture.rs b/tests/src/fixture.rs index 0ee406de36d..2b4d7289d31 100644 --- a/tests/src/fixture.rs +++ b/tests/src/fixture.rs @@ -21,6 +21,7 @@ use graph::cheap_clone::CheapClone; use graph::components::store::{BlockStore, DeploymentLocator}; use graph::data::graphql::effort::LoadManager; use graph::data::query::{Query, QueryTarget}; +use graph::data::subgraph::schema::SubgraphError; use graph::env::EnvVars; use graph::ipfs_client::IpfsClient; use graph::prelude::ethabi::ethereum_types::H256; @@ -157,9 +158,30 @@ impl TestContext { .await .expect("unable to start subgraph"); - wait_for_sync(&self.logger, &self.store, &self.deployment.hash, stop_block) + wait_for_sync( + &self.logger, + &self.store, + &self.deployment.clone(), + stop_block, + ) + .await + .unwrap(); + } + + pub async fn start_and_sync_to_error(&self, stop_block: BlockPtr) -> SubgraphError { + self.provider + .start(self.deployment.clone(), Some(stop_block.number)) .await - .unwrap(); + .expect("unable to start subgraph"); + + wait_for_sync( + &self.logger, + &self.store, + &self.deployment.clone(), + stop_block, + ) + .await + .unwrap_err() } pub async fn query(&self, query: &str) -> Result, Vec> { @@ -376,14 +398,14 @@ pub fn cleanup( pub async fn wait_for_sync( logger: &Logger, store: &SubgraphStore, - hash: &DeploymentHash, + deployment: &DeploymentLocator, stop_block: BlockPtr, -) -> Result<(), Error> { +) -> Result<(), SubgraphError> { let mut err_count = 0; while err_count < 10 { tokio::time::sleep(Duration::from_millis(1000)).await; - let block_ptr = match store.least_block_ptr(&hash).await { + let block_ptr = match store.least_block_ptr(&deployment.hash).await { Ok(Some(ptr)) => ptr, res => { info!(&logger, "{:?}", res); @@ -397,15 +419,12 @@ pub async fn wait_for_sync( break; } - if !store.is_healthy(&hash).await.unwrap() { - return Err(anyhow::anyhow!("subgraph failed unexpectedly")); + let status = store.status_for_id(deployment.id); + if let Some(fatal_error) = status.fatal_error { + return Err(fatal_error); } } - if !store.is_healthy(&hash).await.unwrap() { - return Err(anyhow::anyhow!("subgraph failed unexpectedly")); - } - Ok(()) } diff --git a/tests/tests/runner.rs b/tests/tests/runner.rs index 343bbbe8120..0b13c9b962a 100644 --- a/tests/tests/runner.rs +++ b/tests/tests/runner.rs @@ -5,6 +5,7 @@ use std::time::Duration; use graph::blockchain::block_stream::BlockWithTriggers; use graph::blockchain::{Block, BlockPtr, Blockchain}; +use graph::data::subgraph::schema::SubgraphError; use graph::data_source::CausalityRegion; use graph::env::EnvVars; use graph::object; @@ -142,7 +143,8 @@ async fn file_data_sources() { let block_2 = empty_block(block_1.ptr(), test_ptr(2)); let block_3 = empty_block(block_2.ptr(), test_ptr(3)); let block_4 = empty_block(block_3.ptr(), test_ptr(4)); - vec![block_0, block_1, block_2, block_3, block_4] + let block_5 = empty_block(block_4.ptr(), test_ptr(5)); + vec![block_0, block_1, block_2, block_3, block_4, block_5] }; let stop_block = test_ptr(1); @@ -204,6 +206,7 @@ async fn file_data_sources() { ctx.provider.stop(ctx.deployment.clone()).await.unwrap(); let stop_block = test_ptr(4); ctx.start_and_sync_to(stop_block).await; + ctx.provider.stop(ctx.deployment.clone()).await.unwrap(); let writable = ctx .store .clone() @@ -219,6 +222,19 @@ async fn file_data_sources() { assert!(data_source.causality_region == causality_region.next()); causality_region = causality_region.next(); } + + let stop_block = test_ptr(5); + let err = ctx.start_and_sync_to_error(stop_block).await; + let message = "entity type `IpfsFile1` is not on the 'entities' list for data source `File2`. \ + Hint: Add `IpfsFile1` to the 'entities' list, which currently is: `IpfsFile`.\twasm backtrace:\t 0: 0x33bf - !src/mapping/handleFile1\t in handler `handleFile1` at block #5 ()".to_string(); + let expected_err = SubgraphError { + subgraph_id: ctx.deployment.hash.clone(), + message, + block_ptr: Some(test_ptr(5)), + handler: None, + deterministic: false, + }; + assert_eq!(err, expected_err); } #[tokio::test]