Skip to content

Commit

Permalink
fix(11397): surface proper errors in ParquetSink (#11399)
Browse files Browse the repository at this point in the history
* fix(11397): do not surface errors for closed channels, and instead let the task join errors be surfaced

* fix(11397): terminate early on channel send failure
  • Loading branch information
wiedld authored Jul 12, 2024
1 parent d542cbd commit 1dfac86
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
32 changes: 16 additions & 16 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,12 +893,12 @@ async fn send_arrays_to_col_writers(
let mut next_channel = 0;
for (array, field) in rb.columns().iter().zip(schema.fields()) {
for c in compute_leaves(field, array)? {
col_array_channels[next_channel]
.send(c)
.await
.map_err(|_| {
DataFusionError::Internal("Unable to send array to writer!".into())
})?;
// Do not surface error from closed channel (means something
// else hit an error, and the plan is shutting down).
if col_array_channels[next_channel].send(c).await.is_err() {
return Ok(());
}

next_channel += 1;
}
}
Expand Down Expand Up @@ -984,11 +984,11 @@ fn spawn_parquet_parallel_serialization_task(
&pool,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
// Do not surface error from closed channel (means something
// else hit an error, and the plan is shutting down).
if serialize_tx.send(finalize_rg_task).await.is_err() {
return Ok(());
}

current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);
Expand All @@ -1013,11 +1013,11 @@ fn spawn_parquet_parallel_serialization_task(
&pool,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
// Do not surface error from closed channel (means something
// else hit an error, and the plan is shutting down).
if serialize_tx.send(finalize_rg_task).await.is_err() {
return Ok(());
}
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ async fn oom_parquet_sink() {
path.to_string_lossy()
))
.with_expected_errors(vec![
// TODO: update error handling in ParquetSink
"Unable to send array to writer!",
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
])
.with_memory_limit(200_000)
.run()
Expand Down

0 comments on commit 1dfac86

Please sign in to comment.