diff --git a/src/stream/src/executor/wrapper.rs b/src/stream/src/executor/wrapper.rs index 158508d3dc5c6..b2e574b095599 100644 --- a/src/stream/src/executor/wrapper.rs +++ b/src/stream/src/executor/wrapper.rs @@ -118,6 +118,8 @@ impl WrapperExecutor { let stream = trace::instrument_await_tree(info.clone(), extra.actor_id, extra.executor_id, stream); + // Schema check + let stream = schema_check::schema_check(info.clone(), stream); // Epoch check let stream = epoch_check::epoch_check(info, stream); diff --git a/src/stream/src/executor/wrapper/schema_check.rs b/src/stream/src/executor/wrapper/schema_check.rs index 4bc99d3b3c8bb..d23eca2b455c6 100644 --- a/src/stream/src/executor/wrapper/schema_check.rs +++ b/src/stream/src/executor/wrapper/schema_check.rs @@ -27,13 +27,25 @@ pub async fn schema_check(info: Arc, input: impl MessageStream) { for message in input { let message = message?; - if let Message::Chunk(chunk) = &message { - risingwave_common::util::schema_check::schema_check( + match &message { + Message::Chunk(chunk) => risingwave_common::util::schema_check::schema_check( info.schema.fields().iter().map(|f| &f.data_type), chunk.columns(), - ) - .unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e)); + ), + Message::Watermark(watermark) => { + let expected = info.schema.fields()[watermark.col_idx].data_type(); + let found = &watermark.data_type; + if &expected != found { + Err(format!( + "watermark type mismatched: expected {expected}, found {found}" + )) + } else { + Ok(()) + } + } + Message::Barrier(_) => Ok(()), } + .unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e)); yield message; }