Skip to content

Commit

Permalink
fix: propagate connector reader error to the upper layer (risingwavel…
Browse files Browse the repository at this point in the history
…abs#8920)

Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Mar 31, 2023
1 parent 74b935e commit e51b240
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 6 deletions.
1 change: 0 additions & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use risingwave_connector::source::{
pub struct ConnectorSource {
pub config: ConnectorProperties,
pub columns: Vec<SourceColumnDesc>,
// pub parser: Arc<SourceParserImpl>,
pub parser_config: SpecificParserConfig,
pub connector_message_buffer_size: usize,
}
Expand Down
6 changes: 1 addition & 5 deletions src/stream/src/executor/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::pin::Pin;
use std::task::Poll;

use await_tree::InstrumentAwait;
use either::Either;
use futures::stream::{select_with_strategy, BoxStream, PollNext, SelectWithStrategy};
use futures::{Stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -58,10 +57,7 @@ impl<const BIASED: bool> StreamReaderWithPause<BIASED> {
match chunk {
Ok(chunk) => yield chunk,
Err(err) => {
error!("hang up stream reader due to polling error: {}", err);
futures::future::pending()
.instrument_await("source_error")
.await
return Err(StreamExecutorError::connector_error(err));
}
}
}
Expand Down

0 comments on commit e51b240

Please sign in to comment.