From e1ae04ea8f8485e640558f732f3f65d87b3db570 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Mon, 13 Mar 2023 14:34:24 +0800 Subject: [PATCH] fix(streaming): hop executor handle watermark (#8498) --- src/stream/src/executor/hop_window.rs | 345 ++++++++++++++++++++++---- 1 file changed, 297 insertions(+), 48 deletions(-) diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index c987f2a78cba8..b8407d1e9480e 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -85,6 +85,23 @@ impl Executor for HopWindowExecutor { } impl HopWindowExecutor { + fn derive_watermarks( + input_len: usize, + time_col_idx: usize, + output_indices: &[usize], + ) -> Vec> { + let mut watermark_derivations = vec![vec![]; input_len]; + for (out_i, in_i) in output_indices.iter().enumerate() { + let in_i = *in_i; + if in_i >= input_len { + watermark_derivations[time_col_idx].push(out_i); + } else { + watermark_derivations[in_i].push(out_i); + } + } + watermark_derivations + } + #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(self: Box) { let Self { @@ -95,6 +112,7 @@ impl HopWindowExecutor { window_size, output_indices, info, + time_col_idx, .. } = *self; let units = window_size @@ -111,55 +129,64 @@ impl HopWindowExecutor { let window_start_col_index = input.schema().len(); let window_end_col_index = input.schema().len() + 1; + let watermark_derivations = + Self::derive_watermarks(input.schema().len(), time_col_idx, &output_indices); #[for_await] for msg in input.execute() { let msg = msg?; - if let Message::Chunk(chunk) = msg { - // TODO: compact may be not necessary here. - let chunk = chunk.compact(); - let (data_chunk, ops) = chunk.into_parts(); - // SAFETY: Already compacted. - assert!(matches!(data_chunk.vis(), Vis::Compact(_))); - let _len = data_chunk.cardinality(); - for i in 0..units { - let window_start_col = if output_indices.contains(&window_start_col_index) { - Some( - self.window_start_exprs[i].eval_infallible(&data_chunk, |err| { - ctx.on_compute_error(err, &info.identity) - }), - ) - } else { - None - }; - let window_end_col = if output_indices.contains(&window_end_col_index) { - Some( - self.window_end_exprs[i].eval_infallible(&data_chunk, |err| { - ctx.on_compute_error(err, &info.identity) - }), - ) - } else { - None - }; - let new_cols = output_indices - .iter() - .filter_map(|&idx| { - if idx < window_start_col_index { - Some(data_chunk.column_at(idx).clone()) - } else if idx == window_start_col_index { - Some(Column::new(window_start_col.clone().unwrap())) - } else if idx == window_end_col_index { - Some(Column::new(window_end_col.clone().unwrap())) - } else { - None - } - }) - .collect(); - let new_chunk = StreamChunk::new(ops.clone(), new_cols, None); - yield Message::Chunk(new_chunk); + match msg { + Message::Chunk(chunk) => { + // TODO: compact may be not necessary here. + let chunk = chunk.compact(); + let (data_chunk, ops) = chunk.into_parts(); + // SAFETY: Already compacted. + assert!(matches!(data_chunk.vis(), Vis::Compact(_))); + let _len = data_chunk.cardinality(); + for i in 0..units { + let window_start_col = if output_indices.contains(&window_start_col_index) { + Some( + self.window_start_exprs[i].eval_infallible(&data_chunk, |err| { + ctx.on_compute_error(err, &info.identity) + }), + ) + } else { + None + }; + let window_end_col = if output_indices.contains(&window_end_col_index) { + Some( + self.window_end_exprs[i].eval_infallible(&data_chunk, |err| { + ctx.on_compute_error(err, &info.identity) + }), + ) + } else { + None + }; + let new_cols = output_indices + .iter() + .filter_map(|&idx| { + if idx < window_start_col_index { + Some(data_chunk.column_at(idx).clone()) + } else if idx == window_start_col_index { + Some(Column::new(window_start_col.clone().unwrap())) + } else if idx == window_end_col_index { + Some(Column::new(window_end_col.clone().unwrap())) + } else { + None + } + }) + .collect(); + let new_chunk = StreamChunk::new(ops.clone(), new_cols, None); + yield Message::Chunk(new_chunk); + } + } + Message::Barrier(b) => { + yield Message::Barrier(b); + } + Message::Watermark(w) => { + for i in &watermark_derivations[w.col_idx] { + yield Message::Watermark(w.clone().with_idx(*i)); + } } - } else { - yield msg; - continue; }; } } @@ -174,9 +201,9 @@ mod tests { use risingwave_common::types::{DataType, IntervalUnit}; use risingwave_expr::expr::test_utils::make_hop_window_expression; - use crate::executor::test_utils::MockSource; - use crate::executor::{ActorContext, Executor, ExecutorInfo, StreamChunk}; - + use super::super::*; + use crate::executor::test_utils::{MessageSender, MockSource}; + use crate::executor::{ActorContext, Executor, ExecutorInfo, ScalarImpl, StreamChunk}; fn create_executor(output_indices: Vec) -> Box { let field1 = Field::unnamed(DataType::Int64); let field2 = Field::unnamed(DataType::Int64); @@ -305,4 +332,226 @@ mod tests { ) ); } + + fn create_executor2(output_indices: Vec) -> (MessageSender, Box) { + let field1 = Field::unnamed(DataType::Int64); + let field2 = Field::unnamed(DataType::Int64); + let field3 = Field::with_name(DataType::Timestamp, "created_at"); + let schema = Schema::new(vec![field1, field2, field3]); + let pk_indices = vec![0]; + let (tx, source) = MockSource::channel(schema.clone(), pk_indices.clone()); + + let window_slide = IntervalUnit::from_minutes(15); + let window_size = IntervalUnit::from_minutes(30); + let (window_start_exprs, window_end_exprs) = + make_hop_window_expression(DataType::Timestamp, 2, window_size, window_slide).unwrap(); + + ( + tx, + super::HopWindowExecutor::new( + ActorContext::create(123), + Box::new(source), + ExecutorInfo { + // TODO: the schema is incorrect, but it seems useless here. + schema, + pk_indices, + identity: "test".to_string(), + }, + 2, + window_slide, + window_size, + window_start_exprs, + window_end_exprs, + output_indices, + ) + .boxed(), + ) + } + + #[tokio::test] + async fn test_watermark_full_output() { + let (mut tx, hop) = create_executor2((0..5).collect()); + let mut hop = hop.execute(); + + // TODO: the datatype is incorrect, but it seems useless here. + tx.push_int64_watermark(0, 100); + tx.push_int64_watermark(1, 100); + tx.push_int64_watermark(2, 100); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 0, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 1, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 2, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 3, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 4, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + } + + #[tokio::test] + async fn test_watermark_output_indices1() { + let (mut tx, hop) = create_executor2(vec![4, 1, 0, 2]); + let mut hop = hop.execute(); + + // TODO: the datatype is incorrect, but it seems useless here. + tx.push_int64_watermark(0, 100); + tx.push_int64_watermark(1, 100); + tx.push_int64_watermark(2, 100); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 2, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 1, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 0, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 3, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + } + + #[tokio::test] + async fn test_watermark_output_indices2() { + let (mut tx, hop) = create_executor2(vec![4, 1, 5, 0, 2]); + let mut hop = hop.execute(); + + // TODO: the datatype is incorrect, but it seems useless here. + tx.push_int64_watermark(0, 100); + tx.push_int64_watermark(1, 100); + tx.push_int64_watermark(2, 100); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 3, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 1, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 0, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 2, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + + let w = hop.next().await.unwrap().unwrap(); + let w = w.as_watermark().unwrap(); + assert_eq!( + w, + &Watermark { + col_idx: 4, + data_type: DataType::Int64, + val: ScalarImpl::Int64(100) + } + ); + } }