Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(offchain): Enforce entities list on offchain data sources #4147

Merged
merged 2 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,12 @@ where
"Attempted to create data source in offchain data source handler. This is not yet supported.",
);

// This propagates any deterministic error as a non-deterministic one. Which might make
// sense considering offchain data sources are non-deterministic.
if let Some(err) = block_state.deterministic_errors.into_iter().next() {
return Err(anyhow!("{}", err.to_string()));
}

mods.extend(block_state.entity_cache.as_modifications()?.modifications);
processed_data_sources.extend(block_state.processed_data_sources);
}
Expand Down
2 changes: 1 addition & 1 deletion graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl SubgraphManifestEntity {
}
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SubgraphError {
pub subgraph_id: DeploymentHash,
pub message: String,
Expand Down
47 changes: 46 additions & 1 deletion graph/src/data_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
},
components::{
link_resolver::LinkResolver,
store::{BlockNumber, StoredDynamicDataSource},
store::{BlockNumber, EntityType, StoredDynamicDataSource},
},
data_source::offchain::OFFCHAIN_KINDS,
prelude::{CheapClone as _, DataSourceContext},
Expand Down Expand Up @@ -42,6 +42,42 @@ pub enum DataSourceCreationError {
Unknown(#[from] Error),
}

/// Which entity types a data source can read and write to.
///
/// Currently this is only enforced on offchain data sources and templates, based on the `entities`
/// key in the manifest. This informs which entity tables need an explicit `causality_region` column
/// and which will always have `causality_region == 0`.
///
/// Note that this is just an optimization and not sufficient for causality region isolation, since
/// generally the causality region is a property of the entity, not of the entity type.
///
/// See also: entity-type-access
pub enum EntityTypeAccess {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth adding a bit more here about how this interacts with causality region based isolation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've written a more detailed comment.

Any,
Restriced(Vec<EntityType>),
}

impl fmt::Display for EntityTypeAccess {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match self {
Self::Any => write!(f, "Any"),
Self::Restriced(entities) => {
let strings = entities.iter().map(|e| e.as_str()).collect::<Vec<_>>();
write!(f, "{}", strings.join(", "))
}
}
}
}

impl EntityTypeAccess {
pub fn allows(&self, entity_type: &EntityType) -> bool {
match self {
Self::Any => true,
Self::Restriced(types) => types.contains(entity_type),
}
}
}

impl<C: Blockchain> DataSource<C> {
pub fn as_onchain(&self) -> Option<&C::DataSource> {
match self {
Expand Down Expand Up @@ -106,6 +142,15 @@ impl<C: Blockchain> DataSource<C> {
}
}

pub fn entities(&self) -> EntityTypeAccess {
match self {
// Note: Onchain data sources have an `entities` field in the manifest, but it has never
// been enforced.
Self::Onchain(_) => EntityTypeAccess::Any,
Self::Offchain(ds) => EntityTypeAccess::Restriced(ds.mapping.entities.clone()),
}
}

pub fn match_and_decode(
&self,
trigger: &TriggerData<C>,
Expand Down
6 changes: 3 additions & 3 deletions graph/src/data_source/offchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
blockchain::{BlockPtr, Blockchain},
components::{
link_resolver::LinkResolver,
store::{BlockNumber, StoredDynamicDataSource},
store::{BlockNumber, EntityType, StoredDynamicDataSource},
subgraph::DataSourceTemplateInfo,
},
data::store::scalar::Bytes,
Expand Down Expand Up @@ -245,7 +245,7 @@ pub enum Source {
pub struct Mapping {
pub language: String,
pub api_version: semver::Version,
pub entities: Vec<String>,
pub entities: Vec<EntityType>,
pub handler: String,
pub runtime: Arc<Vec<u8>>,
pub link: Link,
Expand All @@ -271,7 +271,7 @@ pub struct UnresolvedMapping {
pub language: String,
pub file: Link,
pub handler: String,
pub entities: Vec<String>,
pub entities: Vec<EntityType>,
}

impl UnresolvedDataSource {
Expand Down
24 changes: 22 additions & 2 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use graph::components::subgraph::{
PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing,
};
use graph::data::store;
use graph::data_source::{DataSource, DataSourceTemplate};
use graph::data_source::{DataSource, DataSourceTemplate, EntityTypeAccess};
use graph::ensure;
use graph::prelude::ethabi::param_type::Reader;
use graph::prelude::ethabi::{decode, encode, Token};
Expand Down Expand Up @@ -63,6 +63,7 @@ pub struct HostExports<C: Blockchain> {
data_source_address: Vec<u8>,
subgraph_network: String,
data_source_context: Arc<Option<DataSourceContext>>,
entity_type_access: EntityTypeAccess,

/// Some data sources have indeterminism or different notions of time. These
/// need to be each be stored separately to separate causality between them,
Expand All @@ -89,6 +90,7 @@ impl<C: Blockchain> HostExports<C> {
data_source_name: data_source.name().to_owned(),
data_source_address: data_source.address().unwrap_or_default(),
data_source_context: data_source.context().cheap_clone(),
entity_type_access: data_source.entities(),
causality_region: PoICausalityRegion::from_network(&subgraph_network),
subgraph_network,
templates,
Expand All @@ -97,6 +99,21 @@ impl<C: Blockchain> HostExports<C> {
}
}

/// Enfore the entity type access restrictions. See also: entity-type-access
fn check_entity_type_access(&self, entity_type: &EntityType) -> Result<(), HostExportError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even though this is not exported I think we should a bit of context here as a comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've written a short comment and linked it to the one on enum EntityTypeAccess.

match self.entity_type_access.allows(entity_type) {
true => Ok(()),
false => Err(HostExportError::Deterministic(anyhow!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this error be added as a typed error? Seems quite specific to be matched on string so I think it could be useful to have an IsolationError subtype

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could, but given that for now we don't need to match on it I think it's fine to just use the Deterministic variant.

"entity type `{}` is not on the 'entities' list for data source `{}`. \
Hint: Add `{}` to the 'entities' list, which currently is: `{}`.",
entity_type,
self.data_source_name,
entity_type,
self.entity_type_access
))),
}
}

pub(crate) fn abort(
&self,
message: Option<String>,
Expand Down Expand Up @@ -139,7 +156,7 @@ impl<C: Blockchain> HostExports<C> {
data: HashMap<String, Value>,
stopwatch: &StopwatchMetrics,
gas: &GasCounter,
) -> Result<(), anyhow::Error> {
) -> Result<(), HostExportError> {
let poi_section = stopwatch.start_section("host_export_store_set__proof_of_indexing");
write_poi_event(
proof_of_indexing,
Expand All @@ -157,6 +174,7 @@ impl<C: Blockchain> HostExports<C> {
entity_type: EntityType::new(entity_type),
entity_id: entity_id.into(),
};
self.check_entity_type_access(&key.entity_type)?;

gas.consume_host_fn(gas::STORE_SET.with_args(complexity::Linear, (&key, &data)))?;

Expand Down Expand Up @@ -188,6 +206,7 @@ impl<C: Blockchain> HostExports<C> {
entity_type: EntityType::new(entity_type),
entity_id: entity_id.into(),
};
self.check_entity_type_access(&key.entity_type)?;

gas.consume_host_fn(gas::STORE_REMOVE.with_args(complexity::Size, &key))?;

Expand All @@ -207,6 +226,7 @@ impl<C: Blockchain> HostExports<C> {
entity_type: EntityType::new(entity_type),
entity_id: entity_id.into(),
};
self.check_entity_type_access(&store_key.entity_type)?;

let result = state.entity_cache.get(&store_key)?;
gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?;
Expand Down
6 changes: 6 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,12 @@ impl SubgraphStoreInner {
Ok(())
}

#[cfg(debug_assertions)]
pub fn status_for_id(&self, id: graph::components::store::DeploymentId) -> status::Info {
let filter = status::Filter::DeploymentIds(vec![id]);
self.status(filter).unwrap().into_iter().next().unwrap()
}

pub(crate) fn status(&self, filter: status::Filter) -> Result<Vec<status::Info>, StoreError> {
let sites = match filter {
status::Filter::SubgraphName(name) => {
Expand Down
7 changes: 6 additions & 1 deletion tests/integration-tests/file-data-sources/src/mapping.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ethereum, dataSource, BigInt, Bytes } from '@graphprotocol/graph-ts'
import { ethereum, dataSource, BigInt, Bytes, DataSourceContext } from '@graphprotocol/graph-ts'
import { IpfsFile, IpfsFile1 } from '../generated/schema'

export function handleBlock(block: ethereum.Block): void {
Expand All @@ -21,6 +21,11 @@ export function handleBlock(block: ethereum.Block): void {
if (block.number == BigInt.fromI32(3)) {
dataSource.create("File1", ["QmVkvoPGi9jvvuxsHDVJDgzPEzagBaWSZRYoRDzU244HjZ"])
}

// Will fail the subgraph when processed due to mismatch in the entity type and 'entities'.
if (block.number == BigInt.fromI32(5)) {
dataSource.create("File2", ["QmVkvoPGi9jvvuxsHDVJDgzPEzagBaWSZRYoRDzU244HjZ"])
}
}

export function handleFile(data: Bytes): void {
Expand Down
15 changes: 14 additions & 1 deletion tests/integration-tests/file-data-sources/subgraph.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ templates:
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- Gravatar
- IpfsFile
abis:
- name: Contract
file: ./abis/Contract.abi
Expand All @@ -46,4 +46,17 @@ templates:
- name: Contract
file: ./abis/Contract.abi
handler: handleFile1
file: ./src/mapping.ts
- kind: file/ipfs
name: File2
mapping:
kind: ethereum/events
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- IpfsFile # will trigger an error, should be IpfsFile1
abis:
- name: Contract
file: ./abis/Contract.abi
handler: handleFile1
file: ./src/mapping.ts
41 changes: 30 additions & 11 deletions tests/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use graph::cheap_clone::CheapClone;
use graph::components::store::{BlockStore, DeploymentLocator};
use graph::data::graphql::effort::LoadManager;
use graph::data::query::{Query, QueryTarget};
use graph::data::subgraph::schema::SubgraphError;
use graph::env::EnvVars;
use graph::ipfs_client::IpfsClient;
use graph::prelude::ethabi::ethereum_types::H256;
Expand Down Expand Up @@ -157,9 +158,30 @@ impl TestContext {
.await
.expect("unable to start subgraph");

wait_for_sync(&self.logger, &self.store, &self.deployment.hash, stop_block)
wait_for_sync(
&self.logger,
&self.store,
&self.deployment.clone(),
stop_block,
)
.await
.unwrap();
}

pub async fn start_and_sync_to_error(&self, stop_block: BlockPtr) -> SubgraphError {
self.provider
.start(self.deployment.clone(), Some(stop_block.number))
.await
.unwrap();
.expect("unable to start subgraph");

wait_for_sync(
&self.logger,
&self.store,
&self.deployment.clone(),
stop_block,
)
.await
.unwrap_err()
}

pub async fn query(&self, query: &str) -> Result<Option<r::Value>, Vec<QueryError>> {
Expand Down Expand Up @@ -376,14 +398,14 @@ pub fn cleanup(
pub async fn wait_for_sync(
logger: &Logger,
store: &SubgraphStore,
hash: &DeploymentHash,
deployment: &DeploymentLocator,
stop_block: BlockPtr,
) -> Result<(), Error> {
) -> Result<(), SubgraphError> {
let mut err_count = 0;
while err_count < 10 {
tokio::time::sleep(Duration::from_millis(1000)).await;

let block_ptr = match store.least_block_ptr(&hash).await {
let block_ptr = match store.least_block_ptr(&deployment.hash).await {
Ok(Some(ptr)) => ptr,
res => {
info!(&logger, "{:?}", res);
Expand All @@ -397,15 +419,12 @@ pub async fn wait_for_sync(
break;
}

if !store.is_healthy(&hash).await.unwrap() {
return Err(anyhow::anyhow!("subgraph failed unexpectedly"));
let status = store.status_for_id(deployment.id);
if let Some(fatal_error) = status.fatal_error {
return Err(fatal_error);
}
}

if !store.is_healthy(&hash).await.unwrap() {
return Err(anyhow::anyhow!("subgraph failed unexpectedly"));
}

Ok(())
}

Expand Down
18 changes: 17 additions & 1 deletion tests/tests/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use graph::blockchain::block_stream::BlockWithTriggers;
use graph::blockchain::{Block, BlockPtr, Blockchain};
use graph::data::subgraph::schema::SubgraphError;
use graph::data_source::CausalityRegion;
use graph::env::EnvVars;
use graph::object;
Expand Down Expand Up @@ -142,7 +143,8 @@ async fn file_data_sources() {
let block_2 = empty_block(block_1.ptr(), test_ptr(2));
let block_3 = empty_block(block_2.ptr(), test_ptr(3));
let block_4 = empty_block(block_3.ptr(), test_ptr(4));
vec![block_0, block_1, block_2, block_3, block_4]
let block_5 = empty_block(block_4.ptr(), test_ptr(5));
vec![block_0, block_1, block_2, block_3, block_4, block_5]
};
let stop_block = test_ptr(1);

Expand Down Expand Up @@ -204,6 +206,7 @@ async fn file_data_sources() {
ctx.provider.stop(ctx.deployment.clone()).await.unwrap();
let stop_block = test_ptr(4);
ctx.start_and_sync_to(stop_block).await;
ctx.provider.stop(ctx.deployment.clone()).await.unwrap();
let writable = ctx
.store
.clone()
Expand All @@ -219,6 +222,19 @@ async fn file_data_sources() {
assert!(data_source.causality_region == causality_region.next());
causality_region = causality_region.next();
}

let stop_block = test_ptr(5);
let err = ctx.start_and_sync_to_error(stop_block).await;
let message = "entity type `IpfsFile1` is not on the 'entities' list for data source `File2`. \
Copy link
Contributor

@mangas mangas Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be enough to check if this is an isolation error somehow (left a suggestion about subtype but that might be lost). Maybe we could match an important part of the match like IpfsFile1 or somnething rather than the full message which also includes a backtrace. As it is it will be potentially broken if this function changes.

Copy link
Collaborator Author

@leoyvens leoyvens Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the test breaks when functionality changes, that's good! Gives us a chance to check that the change was intended. Even if its for the backtrace which is not directly related to what's being tested.

Hint: Add `IpfsFile1` to the 'entities' list, which currently is: `IpfsFile`.\twasm backtrace:\t 0: 0x33bf - <unknown>!src/mapping/handleFile1\t in handler `handleFile1` at block #5 ()".to_string();
let expected_err = SubgraphError {
subgraph_id: ctx.deployment.hash.clone(),
message,
block_ptr: Some(test_ptr(5)),
handler: None,
deterministic: false,
};
assert_eq!(err, expected_err);
}

#[tokio::test]
Expand Down