Skip to content

Commit

Permalink
fix(streaming): also enable schema check in release profile (risingwa…
Browse files Browse the repository at this point in the history
…velabs#8711)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Mar 23, 2023
1 parent ad61a71 commit a1d084d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/stream/src/executor/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ impl WrapperExecutor {
let stream =
trace::instrument_await_tree(info.clone(), extra.actor_id, extra.executor_id, stream);

// Schema check
let stream = schema_check::schema_check(info.clone(), stream);
// Epoch check
let stream = epoch_check::epoch_check(info, stream);

Expand Down
20 changes: 16 additions & 4 deletions src/stream/src/executor/wrapper/schema_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,25 @@ pub async fn schema_check(info: Arc<ExecutorInfo>, input: impl MessageStream) {
for message in input {
let message = message?;

if let Message::Chunk(chunk) = &message {
risingwave_common::util::schema_check::schema_check(
match &message {
Message::Chunk(chunk) => risingwave_common::util::schema_check::schema_check(
info.schema.fields().iter().map(|f| &f.data_type),
chunk.columns(),
)
.unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e));
),
Message::Watermark(watermark) => {
let expected = info.schema.fields()[watermark.col_idx].data_type();
let found = &watermark.data_type;
if &expected != found {
Err(format!(
"watermark type mismatched: expected {expected}, found {found}"
))
} else {
Ok(())
}
}
Message::Barrier(_) => Ok(()),
}
.unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e));

yield message;
}
Expand Down

0 comments on commit a1d084d

Please sign in to comment.