diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 694c949285374..6271d8af37862 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -893,12 +893,12 @@ async fn send_arrays_to_col_writers( let mut next_channel = 0; for (array, field) in rb.columns().iter().zip(schema.fields()) { for c in compute_leaves(field, array)? { - col_array_channels[next_channel] - .send(c) - .await - .map_err(|_| { - DataFusionError::Internal("Unable to send array to writer!".into()) - })?; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if col_array_channels[next_channel].send(c).await.is_err() { + return Ok(()); + } + next_channel += 1; } } @@ -984,11 +984,11 @@ fn spawn_parquet_parallel_serialization_task( &pool, ); - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } current_rg_rows = 0; rb = rb.slice(rows_left, rb.num_rows() - rows_left); @@ -1013,11 +1013,11 @@ fn spawn_parquet_parallel_serialization_task( &pool, ); - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } } Ok(()) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index f7402357d1c76..7ef24609e238d 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -340,8 +340,8 @@ async fn oom_parquet_sink() { path.to_string_lossy() )) .with_expected_errors(vec![ - // TODO: update error handling in ParquetSink - "Unable to send array to writer!", + "Failed to allocate additional", + "for ParquetSink(ArrowColumnWriter)", ]) .with_memory_limit(200_000) .run()