Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos: Add the message handler #3975

Merged
merged 12 commits into from
Sep 29, 2022
2 changes: 1 addition & 1 deletion chain/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use protobuf::UnknownValueRef;
use std::convert::From;
use std::path::Path;

const REQUIRED_ID: u32 = 66001;
const REQUIRED_ID: u32 = 77001;

#[derive(Debug, Clone)]
pub struct Field {
Expand Down
8 changes: 5 additions & 3 deletions chain/common/tests/resources/firehose/annotations.proto
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
syntax = "proto3";

package firehose;

import "google/protobuf/descriptor.proto";
option go_package = "github.com/streamingfast/pbgo/sf/firehose/v1;pbfirehose";

// 66K range is arbitrary picked number, might be conflict
import "google/protobuf/descriptor.proto";

extend google.protobuf.FieldOptions {
optional bool required = 66001;
optional bool required = 77001;
}
8 changes: 5 additions & 3 deletions chain/cosmos/proto/firehose/annotations.proto
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
syntax = "proto3";

package firehose;

import "google/protobuf/descriptor.proto";
option go_package = "github.com/streamingfast/pbgo/sf/firehose/v1;pbfirehose";

// 66K range is arbitrary picked number, might be conflict
import "google/protobuf/descriptor.proto";

extend google.protobuf.FieldOptions {
optional bool required = 66001;
optional bool required = 77001;
}
15 changes: 15 additions & 0 deletions chain/cosmos/proto/type.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,28 @@ message HeaderOnlyBlock {
message EventData {
Event event = 1 [(firehose.required) = true];
HeaderOnlyBlock block = 2 [(firehose.required) = true];
TransactionContext tx = 3;
}

message TransactionData {
TxResult tx = 1 [(firehose.required) = true];
HeaderOnlyBlock block = 2 [(firehose.required) = true];
}

message MessageData {
google.protobuf.Any message = 1 [(firehose.required) = true];
HeaderOnlyBlock block = 2 [(firehose.required) = true];
TransactionContext tx = 3 [(firehose.required) = true];
}

message TransactionContext {
bytes hash = 1;
uint32 index = 2;
uint32 code = 3;
int64 gas_wanted = 4;
int64 gas_used = 5;
}

message Header {
Consensus version = 1 [(gogoproto.nullable) = false];
string chain_id = 2 [(gogoproto.customname) = "ChainID"];
Expand Down
92 changes: 74 additions & 18 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ pub struct TriggersAdapter {}

#[async_trait]
impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn ancestor_block(
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

async fn scan_triggers(
&self,
_from: BlockNumber,
Expand All @@ -204,10 +212,30 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
// block. This is not currently possible because EventData is automatically
// generated.
.filter_map(|event| {
filter_event_trigger(filter, event, &header_only_block, EventOrigin::BeginBlock)
filter_event_trigger(
filter,
event,
&header_only_block,
None,
EventOrigin::BeginBlock,
)
})
.chain(shared_block.tx_events()?.cloned().filter_map(|event| {
filter_event_trigger(filter, event, &header_only_block, EventOrigin::DeliverTx)
.chain(shared_block.transactions().flat_map(|tx| {
tx.result
.as_ref()
.unwrap()
.events
.iter()
.filter_map(|e| {
filter_event_trigger(
filter,
e.clone(),
&header_only_block,
Some(build_tx_context(tx)),
EventOrigin::DeliverTx,
)
})
.collect::<Vec<_>>()
}))
.chain(
shared_block
Expand All @@ -218,18 +246,32 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
filter,
event,
&header_only_block,
None,
EventOrigin::EndBlock,
)
}),
)
.collect();

triggers.extend(
shared_block
.transactions()
.cloned()
.map(|tx| CosmosTrigger::with_transaction(tx, header_only_block.clone())),
);
triggers.extend(shared_block.transactions().cloned().flat_map(|tx_result| {
let mut triggers: Vec<_> = Vec::new();
if let Some(tx) = tx_result.tx.clone() {
if let Some(tx_body) = tx.body {
triggers.extend(tx_body.messages.into_iter().map(|message| {
CosmosTrigger::with_message(
message,
header_only_block.clone(),
build_tx_context(&tx_result),
)
}));
}
}
triggers.push(CosmosTrigger::with_transaction(
tx_result,
header_only_block.clone(),
));
triggers
}));

if filter.block_filter.trigger_every_block {
triggers.push(CosmosTrigger::Block(shared_block.cheap_clone()));
Expand All @@ -242,14 +284,6 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
panic!("Should never be called since not used by FirehoseBlockStream")
}

async fn ancestor_block(
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

/// Panics if `block` is genesis.
/// But that's ok since this is only called when reverting `block`.
async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
Expand All @@ -265,15 +299,31 @@ fn filter_event_trigger(
filter: &TriggerFilter,
event: codec::Event,
block: &codec::HeaderOnlyBlock,
tx_context: Option<codec::TransactionContext>,
origin: EventOrigin,
) -> Option<CosmosTrigger> {
if filter.event_type_filter.matches(&event.event_type) {
Some(CosmosTrigger::with_event(event, block.clone(), origin))
Some(CosmosTrigger::with_event(
event,
block.clone(),
tx_context,
origin,
))
} else {
None
}
}

fn build_tx_context(tx: &codec::TxResult) -> codec::TransactionContext {
codec::TransactionContext {
hash: tx.hash.clone(),
index: tx.index,
code: tx.result.as_ref().unwrap().code,
gas_wanted: tx.result.as_ref().unwrap().gas_wanted,
gas_used: tx.result.as_ref().unwrap().gas_used,
}
}

pub struct FirehoseMapper {
endpoint: Arc<FirehoseEndpoint>,
}
Expand Down Expand Up @@ -408,16 +458,19 @@ mod test {
CosmosTrigger::with_event(
Event::test_with_type("begin_event_3"),
header_only_block.clone(),
None,
EventOrigin::BeginBlock,
),
CosmosTrigger::with_event(
Event::test_with_type("tx_event_3"),
header_only_block.clone(),
Some(build_tx_context(&block_with_events.transactions[2])),
EventOrigin::DeliverTx,
),
CosmosTrigger::with_event(
Event::test_with_type("end_event_3"),
header_only_block.clone(),
None,
EventOrigin::EndBlock,
),
CosmosTrigger::with_transaction(
Expand All @@ -442,16 +495,19 @@ mod test {
CosmosTrigger::with_event(
Event::test_with_type("begin_event_3"),
header_only_block.clone(),
None,
EventOrigin::BeginBlock,
),
CosmosTrigger::with_event(
Event::test_with_type("tx_event_2"),
header_only_block.clone(),
Some(build_tx_context(&block_with_events.transactions[1])),
EventOrigin::DeliverTx,
),
CosmosTrigger::with_event(
Event::test_with_type("end_event_1"),
header_only_block.clone(),
None,
EventOrigin::EndBlock,
),
CosmosTrigger::with_transaction(
Expand Down
39 changes: 14 additions & 25 deletions chain/cosmos/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ impl Block {
.ok_or_else(|| anyhow!("block data missing header field"))
}

pub fn events(&self) -> Result<impl Iterator<Item = &Event>, Error> {
let events = self
.begin_block_events()?
.chain(self.tx_events()?)
.chain(self.end_block_events()?);

Ok(events)
}

pub fn begin_block_events(&self) -> Result<impl Iterator<Item = &Event>, Error> {
let events = self
.result_begin_block
Expand All @@ -35,22 +26,6 @@ impl Block {
Ok(events)
}

pub fn tx_events(&self) -> Result<impl Iterator<Item = &Event>, Error> {
if self.transactions.iter().any(|tx| tx.result.is_none()) {
return Err(anyhow!("block data transaction missing result field"));
}

let events = self.transactions.iter().flat_map(|tx| {
tx.result
.as_ref()
.map(|b| b.events.iter())
.into_iter()
.flatten()
});

Ok(events)
}

pub fn end_block_events(&self) -> Result<impl Iterator<Item = &Event>, Error> {
let events = self
.result_end_block
Expand Down Expand Up @@ -197,3 +172,17 @@ impl TransactionData {
.ok_or_else(|| anyhow!("transaction data missing block field"))
}
}

impl MessageData {
pub fn message(&self) -> Result<&prost_types::Any, Error> {
self.message
.as_ref()
.ok_or_else(|| anyhow!("message data missing message field"))
}

pub fn block(&self) -> Result<&HeaderOnlyBlock, Error> {
self.block
.as_ref()
.ok_or_else(|| anyhow!("message data missing block field"))
}
}
Loading