From e51b24084148d5d2a2a98725b6e3071633a48ec3 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Fri, 31 Mar 2023 17:45:44 +0800 Subject: [PATCH] fix: propagate connector reader error to the upper layer (#8920) Signed-off-by: tabVersion --- src/source/src/connector_source.rs | 1 - src/stream/src/executor/stream_reader.rs | 6 +----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 979e905b0e596..6752580a6cb6c 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -32,7 +32,6 @@ use risingwave_connector::source::{ pub struct ConnectorSource { pub config: ConnectorProperties, pub columns: Vec, - // pub parser: Arc, pub parser_config: SpecificParserConfig, pub connector_message_buffer_size: usize, } diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index 02018eac8981f..7846609aff2a3 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -15,7 +15,6 @@ use std::pin::Pin; use std::task::Poll; -use await_tree::InstrumentAwait; use either::Either; use futures::stream::{select_with_strategy, BoxStream, PollNext, SelectWithStrategy}; use futures::{Stream, StreamExt, TryStreamExt}; @@ -58,10 +57,7 @@ impl StreamReaderWithPause { match chunk { Ok(chunk) => yield chunk, Err(err) => { - error!("hang up stream reader due to polling error: {}", err); - futures::future::pending() - .instrument_await("source_error") - .await + return Err(StreamExecutorError::connector_error(err)); } } }