Skip to content

Commit

Permalink
fix(offchain): Enforce entities list on offchain data sources (#4147)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens authored Nov 11, 2022
1 parent 2da697b commit a8d184c
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 21 deletions.
6 changes: 6 additions & 0 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,12 @@ where
"Attempted to create data source in offchain data source handler. This is not yet supported.",
);

// This propagates any deterministic error as a non-deterministic one. Which might make
// sense considering offchain data sources are non-deterministic.
if let Some(err) = block_state.deterministic_errors.into_iter().next() {
return Err(anyhow!("{}", err.to_string()));
}

mods.extend(block_state.entity_cache.as_modifications()?.modifications);
processed_data_sources.extend(block_state.processed_data_sources);
}
Expand Down
2 changes: 1 addition & 1 deletion graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl SubgraphManifestEntity {
}
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SubgraphError {
pub subgraph_id: DeploymentHash,
pub message: String,
Expand Down
47 changes: 46 additions & 1 deletion graph/src/data_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
},
components::{
link_resolver::LinkResolver,
store::{BlockNumber, StoredDynamicDataSource},
store::{BlockNumber, EntityType, StoredDynamicDataSource},
},
data_source::offchain::OFFCHAIN_KINDS,
prelude::{CheapClone as _, DataSourceContext},
Expand Down Expand Up @@ -42,6 +42,42 @@ pub enum DataSourceCreationError {
Unknown(#[from] Error),
}

/// Which entity types a data source can read and write to.
///
/// Currently this is only enforced on offchain data sources and templates, based on the `entities`
/// key in the manifest. This informs which entity tables need an explicit `causality_region` column
/// and which will always have `causality_region == 0`.
///
/// Note that this is just an optimization and not sufficient for causality region isolation, since
/// generally the causality region is a property of the entity, not of the entity type.
///
/// See also: entity-type-access
pub enum EntityTypeAccess {
Any,
Restriced(Vec<EntityType>),
}

impl fmt::Display for EntityTypeAccess {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match self {
Self::Any => write!(f, "Any"),
Self::Restriced(entities) => {
let strings = entities.iter().map(|e| e.as_str()).collect::<Vec<_>>();
write!(f, "{}", strings.join(", "))
}
}
}
}

impl EntityTypeAccess {
pub fn allows(&self, entity_type: &EntityType) -> bool {
match self {
Self::Any => true,
Self::Restriced(types) => types.contains(entity_type),
}
}
}

impl<C: Blockchain> DataSource<C> {
pub fn as_onchain(&self) -> Option<&C::DataSource> {
match self {
Expand Down Expand Up @@ -106,6 +142,15 @@ impl<C: Blockchain> DataSource<C> {
}
}

pub fn entities(&self) -> EntityTypeAccess {
match self {
// Note: Onchain data sources have an `entities` field in the manifest, but it has never
// been enforced.
Self::Onchain(_) => EntityTypeAccess::Any,
Self::Offchain(ds) => EntityTypeAccess::Restriced(ds.mapping.entities.clone()),
}
}

pub fn match_and_decode(
&self,
trigger: &TriggerData<C>,
Expand Down
6 changes: 3 additions & 3 deletions graph/src/data_source/offchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
blockchain::{BlockPtr, Blockchain},
components::{
link_resolver::LinkResolver,
store::{BlockNumber, StoredDynamicDataSource},
store::{BlockNumber, EntityType, StoredDynamicDataSource},
subgraph::DataSourceTemplateInfo,
},
data::store::scalar::Bytes,
Expand Down Expand Up @@ -245,7 +245,7 @@ pub enum Source {
pub struct Mapping {
pub language: String,
pub api_version: semver::Version,
pub entities: Vec<String>,
pub entities: Vec<EntityType>,
pub handler: String,
pub runtime: Arc<Vec<u8>>,
pub link: Link,
Expand All @@ -271,7 +271,7 @@ pub struct UnresolvedMapping {
pub language: String,
pub file: Link,
pub handler: String,
pub entities: Vec<String>,
pub entities: Vec<EntityType>,
}

impl UnresolvedDataSource {
Expand Down
24 changes: 22 additions & 2 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use graph::components::subgraph::{
PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing,
};
use graph::data::store;
use graph::data_source::{DataSource, DataSourceTemplate};
use graph::data_source::{DataSource, DataSourceTemplate, EntityTypeAccess};
use graph::ensure;
use graph::prelude::ethabi::param_type::Reader;
use graph::prelude::ethabi::{decode, encode, Token};
Expand Down Expand Up @@ -63,6 +63,7 @@ pub struct HostExports<C: Blockchain> {
data_source_address: Vec<u8>,
subgraph_network: String,
data_source_context: Arc<Option<DataSourceContext>>,
entity_type_access: EntityTypeAccess,

/// Some data sources have indeterminism or different notions of time. These
/// need to be each be stored separately to separate causality between them,
Expand All @@ -89,6 +90,7 @@ impl<C: Blockchain> HostExports<C> {
data_source_name: data_source.name().to_owned(),
data_source_address: data_source.address().unwrap_or_default(),
data_source_context: data_source.context().cheap_clone(),
entity_type_access: data_source.entities(),
causality_region: PoICausalityRegion::from_network(&subgraph_network),
subgraph_network,
templates,
Expand All @@ -97,6 +99,21 @@ impl<C: Blockchain> HostExports<C> {
}
}

/// Enfore the entity type access restrictions. See also: entity-type-access
fn check_entity_type_access(&self, entity_type: &EntityType) -> Result<(), HostExportError> {
match self.entity_type_access.allows(entity_type) {
true => Ok(()),
false => Err(HostExportError::Deterministic(anyhow!(
"entity type `{}` is not on the 'entities' list for data source `{}`. \
Hint: Add `{}` to the 'entities' list, which currently is: `{}`.",
entity_type,
self.data_source_name,
entity_type,
self.entity_type_access
))),
}
}

pub(crate) fn abort(
&self,
message: Option<String>,
Expand Down Expand Up @@ -139,7 +156,7 @@ impl<C: Blockchain> HostExports<C> {
data: HashMap<String, Value>,
stopwatch: &StopwatchMetrics,
gas: &GasCounter,
) -> Result<(), anyhow::Error> {
) -> Result<(), HostExportError> {
let poi_section = stopwatch.start_section("host_export_store_set__proof_of_indexing");
write_poi_event(
proof_of_indexing,
Expand All @@ -157,6 +174,7 @@ impl<C: Blockchain> HostExports<C> {
entity_type: EntityType::new(entity_type),
entity_id: entity_id.into(),
};
self.check_entity_type_access(&key.entity_type)?;

gas.consume_host_fn(gas::STORE_SET.with_args(complexity::Linear, (&key, &data)))?;

Expand Down Expand Up @@ -188,6 +206,7 @@ impl<C: Blockchain> HostExports<C> {
entity_type: EntityType::new(entity_type),
entity_id: entity_id.into(),
};
self.check_entity_type_access(&key.entity_type)?;

gas.consume_host_fn(gas::STORE_REMOVE.with_args(complexity::Size, &key))?;

Expand All @@ -207,6 +226,7 @@ impl<C: Blockchain> HostExports<C> {
entity_type: EntityType::new(entity_type),
entity_id: entity_id.into(),
};
self.check_entity_type_access(&store_key.entity_type)?;

let result = state.entity_cache.get(&store_key)?;
gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?;
Expand Down
6 changes: 6 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,12 @@ impl SubgraphStoreInner {
Ok(())
}

#[cfg(debug_assertions)]
pub fn status_for_id(&self, id: graph::components::store::DeploymentId) -> status::Info {
let filter = status::Filter::DeploymentIds(vec![id]);
self.status(filter).unwrap().into_iter().next().unwrap()
}

pub(crate) fn status(&self, filter: status::Filter) -> Result<Vec<status::Info>, StoreError> {
let sites = match filter {
status::Filter::SubgraphName(name) => {
Expand Down
7 changes: 6 additions & 1 deletion tests/integration-tests/file-data-sources/src/mapping.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ethereum, dataSource, BigInt, Bytes } from '@graphprotocol/graph-ts'
import { ethereum, dataSource, BigInt, Bytes, DataSourceContext } from '@graphprotocol/graph-ts'
import { IpfsFile, IpfsFile1 } from '../generated/schema'

export function handleBlock(block: ethereum.Block): void {
Expand All @@ -21,6 +21,11 @@ export function handleBlock(block: ethereum.Block): void {
if (block.number == BigInt.fromI32(3)) {
dataSource.create("File1", ["QmVkvoPGi9jvvuxsHDVJDgzPEzagBaWSZRYoRDzU244HjZ"])
}

// Will fail the subgraph when processed due to mismatch in the entity type and 'entities'.
if (block.number == BigInt.fromI32(5)) {
dataSource.create("File2", ["QmVkvoPGi9jvvuxsHDVJDgzPEzagBaWSZRYoRDzU244HjZ"])
}
}

export function handleFile(data: Bytes): void {
Expand Down
15 changes: 14 additions & 1 deletion tests/integration-tests/file-data-sources/subgraph.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ templates:
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- Gravatar
- IpfsFile
abis:
- name: Contract
file: ./abis/Contract.abi
Expand All @@ -46,4 +46,17 @@ templates:
- name: Contract
file: ./abis/Contract.abi
handler: handleFile1
file: ./src/mapping.ts
- kind: file/ipfs
name: File2
mapping:
kind: ethereum/events
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- IpfsFile # will trigger an error, should be IpfsFile1
abis:
- name: Contract
file: ./abis/Contract.abi
handler: handleFile1
file: ./src/mapping.ts
41 changes: 30 additions & 11 deletions tests/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use graph::cheap_clone::CheapClone;
use graph::components::store::{BlockStore, DeploymentLocator};
use graph::data::graphql::effort::LoadManager;
use graph::data::query::{Query, QueryTarget};
use graph::data::subgraph::schema::SubgraphError;
use graph::env::EnvVars;
use graph::ipfs_client::IpfsClient;
use graph::prelude::ethabi::ethereum_types::H256;
Expand Down Expand Up @@ -157,9 +158,30 @@ impl TestContext {
.await
.expect("unable to start subgraph");

wait_for_sync(&self.logger, &self.store, &self.deployment.hash, stop_block)
wait_for_sync(
&self.logger,
&self.store,
&self.deployment.clone(),
stop_block,
)
.await
.unwrap();
}

pub async fn start_and_sync_to_error(&self, stop_block: BlockPtr) -> SubgraphError {
self.provider
.start(self.deployment.clone(), Some(stop_block.number))
.await
.unwrap();
.expect("unable to start subgraph");

wait_for_sync(
&self.logger,
&self.store,
&self.deployment.clone(),
stop_block,
)
.await
.unwrap_err()
}

pub async fn query(&self, query: &str) -> Result<Option<r::Value>, Vec<QueryError>> {
Expand Down Expand Up @@ -376,14 +398,14 @@ pub fn cleanup(
pub async fn wait_for_sync(
logger: &Logger,
store: &SubgraphStore,
hash: &DeploymentHash,
deployment: &DeploymentLocator,
stop_block: BlockPtr,
) -> Result<(), Error> {
) -> Result<(), SubgraphError> {
let mut err_count = 0;
while err_count < 10 {
tokio::time::sleep(Duration::from_millis(1000)).await;

let block_ptr = match store.least_block_ptr(&hash).await {
let block_ptr = match store.least_block_ptr(&deployment.hash).await {
Ok(Some(ptr)) => ptr,
res => {
info!(&logger, "{:?}", res);
Expand All @@ -397,15 +419,12 @@ pub async fn wait_for_sync(
break;
}

if !store.is_healthy(&hash).await.unwrap() {
return Err(anyhow::anyhow!("subgraph failed unexpectedly"));
let status = store.status_for_id(deployment.id);
if let Some(fatal_error) = status.fatal_error {
return Err(fatal_error);
}
}

if !store.is_healthy(&hash).await.unwrap() {
return Err(anyhow::anyhow!("subgraph failed unexpectedly"));
}

Ok(())
}

Expand Down
18 changes: 17 additions & 1 deletion tests/tests/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use graph::blockchain::block_stream::BlockWithTriggers;
use graph::blockchain::{Block, BlockPtr, Blockchain};
use graph::data::subgraph::schema::SubgraphError;
use graph::data_source::CausalityRegion;
use graph::env::EnvVars;
use graph::object;
Expand Down Expand Up @@ -142,7 +143,8 @@ async fn file_data_sources() {
let block_2 = empty_block(block_1.ptr(), test_ptr(2));
let block_3 = empty_block(block_2.ptr(), test_ptr(3));
let block_4 = empty_block(block_3.ptr(), test_ptr(4));
vec![block_0, block_1, block_2, block_3, block_4]
let block_5 = empty_block(block_4.ptr(), test_ptr(5));
vec![block_0, block_1, block_2, block_3, block_4, block_5]
};
let stop_block = test_ptr(1);

Expand Down Expand Up @@ -204,6 +206,7 @@ async fn file_data_sources() {
ctx.provider.stop(ctx.deployment.clone()).await.unwrap();
let stop_block = test_ptr(4);
ctx.start_and_sync_to(stop_block).await;
ctx.provider.stop(ctx.deployment.clone()).await.unwrap();
let writable = ctx
.store
.clone()
Expand All @@ -219,6 +222,19 @@ async fn file_data_sources() {
assert!(data_source.causality_region == causality_region.next());
causality_region = causality_region.next();
}

let stop_block = test_ptr(5);
let err = ctx.start_and_sync_to_error(stop_block).await;
let message = "entity type `IpfsFile1` is not on the 'entities' list for data source `File2`. \
Hint: Add `IpfsFile1` to the 'entities' list, which currently is: `IpfsFile`.\twasm backtrace:\t 0: 0x33bf - <unknown>!src/mapping/handleFile1\t in handler `handleFile1` at block #5 ()".to_string();
let expected_err = SubgraphError {
subgraph_id: ctx.deployment.hash.clone(),
message,
block_ptr: Some(test_ptr(5)),
handler: None,
deterministic: false,
};
assert_eq!(err, expected_err);
}

#[tokio::test]
Expand Down

0 comments on commit a8d184c

Please sign in to comment.