Skip to content

Commit

Permalink
fix(streaming): hop executor handle watermark (#8498)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Mar 13, 2023
1 parent c0aa78b commit e1ae04e
Showing 1 changed file with 297 additions and 48 deletions.
345 changes: 297 additions & 48 deletions src/stream/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ impl Executor for HopWindowExecutor {
}

impl HopWindowExecutor {
fn derive_watermarks(
input_len: usize,
time_col_idx: usize,
output_indices: &[usize],
) -> Vec<Vec<usize>> {
let mut watermark_derivations = vec![vec![]; input_len];
for (out_i, in_i) in output_indices.iter().enumerate() {
let in_i = *in_i;
if in_i >= input_len {
watermark_derivations[time_col_idx].push(out_i);
} else {
watermark_derivations[in_i].push(out_i);
}
}
watermark_derivations
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(self: Box<Self>) {
let Self {
Expand All @@ -95,6 +112,7 @@ impl HopWindowExecutor {
window_size,
output_indices,
info,
time_col_idx,
..
} = *self;
let units = window_size
Expand All @@ -111,55 +129,64 @@ impl HopWindowExecutor {

let window_start_col_index = input.schema().len();
let window_end_col_index = input.schema().len() + 1;
let watermark_derivations =
Self::derive_watermarks(input.schema().len(), time_col_idx, &output_indices);
#[for_await]
for msg in input.execute() {
let msg = msg?;
if let Message::Chunk(chunk) = msg {
// TODO: compact may be not necessary here.
let chunk = chunk.compact();
let (data_chunk, ops) = chunk.into_parts();
// SAFETY: Already compacted.
assert!(matches!(data_chunk.vis(), Vis::Compact(_)));
let _len = data_chunk.cardinality();
for i in 0..units {
let window_start_col = if output_indices.contains(&window_start_col_index) {
Some(
self.window_start_exprs[i].eval_infallible(&data_chunk, |err| {
ctx.on_compute_error(err, &info.identity)
}),
)
} else {
None
};
let window_end_col = if output_indices.contains(&window_end_col_index) {
Some(
self.window_end_exprs[i].eval_infallible(&data_chunk, |err| {
ctx.on_compute_error(err, &info.identity)
}),
)
} else {
None
};
let new_cols = output_indices
.iter()
.filter_map(|&idx| {
if idx < window_start_col_index {
Some(data_chunk.column_at(idx).clone())
} else if idx == window_start_col_index {
Some(Column::new(window_start_col.clone().unwrap()))
} else if idx == window_end_col_index {
Some(Column::new(window_end_col.clone().unwrap()))
} else {
None
}
})
.collect();
let new_chunk = StreamChunk::new(ops.clone(), new_cols, None);
yield Message::Chunk(new_chunk);
match msg {
Message::Chunk(chunk) => {
// TODO: compact may be not necessary here.
let chunk = chunk.compact();
let (data_chunk, ops) = chunk.into_parts();
// SAFETY: Already compacted.
assert!(matches!(data_chunk.vis(), Vis::Compact(_)));
let _len = data_chunk.cardinality();
for i in 0..units {
let window_start_col = if output_indices.contains(&window_start_col_index) {
Some(
self.window_start_exprs[i].eval_infallible(&data_chunk, |err| {
ctx.on_compute_error(err, &info.identity)
}),
)
} else {
None
};
let window_end_col = if output_indices.contains(&window_end_col_index) {
Some(
self.window_end_exprs[i].eval_infallible(&data_chunk, |err| {
ctx.on_compute_error(err, &info.identity)
}),
)
} else {
None
};
let new_cols = output_indices
.iter()
.filter_map(|&idx| {
if idx < window_start_col_index {
Some(data_chunk.column_at(idx).clone())
} else if idx == window_start_col_index {
Some(Column::new(window_start_col.clone().unwrap()))
} else if idx == window_end_col_index {
Some(Column::new(window_end_col.clone().unwrap()))
} else {
None
}
})
.collect();
let new_chunk = StreamChunk::new(ops.clone(), new_cols, None);
yield Message::Chunk(new_chunk);
}
}
Message::Barrier(b) => {
yield Message::Barrier(b);
}
Message::Watermark(w) => {
for i in &watermark_derivations[w.col_idx] {
yield Message::Watermark(w.clone().with_idx(*i));
}
}
} else {
yield msg;
continue;
};
}
}
Expand All @@ -174,9 +201,9 @@ mod tests {
use risingwave_common::types::{DataType, IntervalUnit};
use risingwave_expr::expr::test_utils::make_hop_window_expression;

use crate::executor::test_utils::MockSource;
use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk};

use super::super::*;
use crate::executor::test_utils::{MessageSender, MockSource};
use crate::executor::{ActorContext, Executor, ExecutorInfo, ScalarImpl, StreamChunk};
fn create_executor(output_indices: Vec<usize>) -> Box<dyn Executor> {
let field1 = Field::unnamed(DataType::Int64);
let field2 = Field::unnamed(DataType::Int64);
Expand Down Expand Up @@ -305,4 +332,226 @@ mod tests {
)
);
}

fn create_executor2(output_indices: Vec<usize>) -> (MessageSender, Box<dyn Executor>) {
let field1 = Field::unnamed(DataType::Int64);
let field2 = Field::unnamed(DataType::Int64);
let field3 = Field::with_name(DataType::Timestamp, "created_at");
let schema = Schema::new(vec![field1, field2, field3]);
let pk_indices = vec![0];
let (tx, source) = MockSource::channel(schema.clone(), pk_indices.clone());

let window_slide = IntervalUnit::from_minutes(15);
let window_size = IntervalUnit::from_minutes(30);
let (window_start_exprs, window_end_exprs) =
make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap();

(
tx,
super::HopWindowExecutor::new(
ActorContext::create(123),
Box::new(source),
ExecutorInfo {
// TODO: the schema is incorrect, but it seems useless here.
schema,
pk_indices,
identity: "test".to_string(),
},
2,
window_slide,
window_size,
window_start_exprs,
window_end_exprs,
output_indices,
)
.boxed(),
)
}

#[tokio::test]
async fn test_watermark_full_output() {
let (mut tx, hop) = create_executor2((0..5).collect());
let mut hop = hop.execute();

// TODO: the datatype is incorrect, but it seems useless here.
tx.push_int64_watermark(0, 100);
tx.push_int64_watermark(1, 100);
tx.push_int64_watermark(2, 100);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 0,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 1,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 2,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 3,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 4,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);
}

#[tokio::test]
async fn test_watermark_output_indices1() {
let (mut tx, hop) = create_executor2(vec![4, 1, 0, 2]);
let mut hop = hop.execute();

// TODO: the datatype is incorrect, but it seems useless here.
tx.push_int64_watermark(0, 100);
tx.push_int64_watermark(1, 100);
tx.push_int64_watermark(2, 100);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 2,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 1,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 0,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 3,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);
}

#[tokio::test]
async fn test_watermark_output_indices2() {
let (mut tx, hop) = create_executor2(vec![4, 1, 5, 0, 2]);
let mut hop = hop.execute();

// TODO: the datatype is incorrect, but it seems useless here.
tx.push_int64_watermark(0, 100);
tx.push_int64_watermark(1, 100);
tx.push_int64_watermark(2, 100);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 3,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 1,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 0,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 2,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);

let w = hop.next().await.unwrap().unwrap();
let w = w.as_watermark().unwrap();
assert_eq!(
w,
&Watermark {
col_idx: 4,
data_type: DataType::Int64,
val: ScalarImpl::Int64(100)
}
);
}
}

0 comments on commit e1ae04e

Please sign in to comment.