Skip to content

Commit

Permalink
chore: add metrics for native chunk (risingwavelabs#8997)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Apr 4, 2023
1 parent fadd752 commit ef22c5b
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
3 changes: 1 addition & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ macro_rules! impl_common_split_reader_logic {
let data_stream = self.into_data_stream();

let data_stream = data_stream
.map_ok(move |data_batch| {
.inspect_ok(move |data_batch| {
metrics
.partition_input_count
.with_label_values(&[&actor_id, &source_id, &split_id])
Expand All @@ -286,7 +286,6 @@ macro_rules! impl_common_split_reader_logic {
.partition_input_bytes
.with_label_values(&[&actor_id, &source_id, &split_id])
.inc_by(sum_bytes);
data_batch
})
.boxed();
let parser =
Expand Down
18 changes: 16 additions & 2 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,22 @@ impl SplitReader for DatagenSplitReader {
// spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE).boxed()
match self.parser_config.specific {
SpecificParserConfig::Native => {
spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE)
.boxed()
let actor_id = self.source_ctx.source_info.actor_id.to_string();
let source_id = self.source_ctx.source_info.source_id.to_string();
let split_id = self.split_id.to_string();
let metrics = self.source_ctx.metrics.clone();
spawn_data_generation_stream(
self.generator
.into_native_stream()
.inspect_ok(move |chunk_with_states| {
metrics
.partition_input_count
.with_label_values(&[&actor_id, &source_id, &split_id])
.inc_by(chunk_with_states.chunk.cardinality() as u64);
}),
BUFFER_SIZE,
)
.boxed()
}
_ => self.into_chunk_stream(),
}
Expand Down
21 changes: 18 additions & 3 deletions src/connector/src/source/nexmark/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use maplit::hashmap;
use nexmark::config::NexmarkConfig;
Expand Down Expand Up @@ -105,15 +105,30 @@ impl SplitReader for NexmarkSplitReader {
}

fn into_stream(self) -> BoxSourceWithStateStream {
let actor_id = self.source_ctx.source_info.actor_id.to_string();
let source_id = self.source_ctx.source_info.source_id.to_string();
let split_id = self.split_id.clone();
let metrics = self.source_ctx.metrics.clone();

// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
spawn_data_generation_stream(self.into_chunk_stream(), BUFFER_SIZE).boxed()
spawn_data_generation_stream(
self.into_native_stream()
.inspect_ok(move |chunk_with_states| {
metrics
.partition_input_count
.with_label_values(&[&actor_id, &source_id, &split_id])
.inc_by(chunk_with_states.chunk.cardinality() as u64);
}),
BUFFER_SIZE,
)
.boxed()
}
}

impl NexmarkSplitReader {
#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
async fn into_chunk_stream(mut self) {
async fn into_native_stream(mut self) {
let start_time = Instant::now();
let start_offset = self.generator.global_offset();
let start_ts = self.generator.timestamp();
Expand Down

0 comments on commit ef22c5b

Please sign in to comment.