From bfc4cb05a099e7fa236ffc9a5966d159c7c70d61 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 10 Jul 2024 10:43:54 -0700 Subject: [PATCH 1/2] fix(11397): do not surface errors for closed channels, and instead let the task join errors be surfaced --- .../src/datasource/file_format/parquet.rs | 23 ++++++------------- datafusion/core/tests/memory_limit/mod.rs | 4 ++-- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 694c94928537..0b02628325bb 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -893,12 +893,9 @@ async fn send_arrays_to_col_writers( let mut next_channel = 0; for (array, field) in rb.columns().iter().zip(schema.fields()) { for c in compute_leaves(field, array)? { - col_array_channels[next_channel] - .send(c) - .await - .map_err(|_| { - DataFusionError::Internal("Unable to send array to writer!".into()) - })?; + // Do not surface error from closed channel. + let _ = col_array_channels[next_channel].send(c).await; + next_channel += 1; } } @@ -984,11 +981,8 @@ fn spawn_parquet_parallel_serialization_task( &pool, ); - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; + // Do not surface error from closed channel. + let _ = serialize_tx.send(finalize_rg_task).await; current_rg_rows = 0; rb = rb.slice(rows_left, rb.num_rows() - rows_left); @@ -1013,11 +1007,8 @@ fn spawn_parquet_parallel_serialization_task( &pool, ); - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; + // Do not surface any error here due to closed channel. + let _ = serialize_tx.send(finalize_rg_task).await; } Ok(()) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index f7402357d1c7..7ef24609e238 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -340,8 +340,8 @@ async fn oom_parquet_sink() { path.to_string_lossy() )) .with_expected_errors(vec![ - // TODO: update error handling in ParquetSink - "Unable to send array to writer!", + "Failed to allocate additional", + "for ParquetSink(ArrowColumnWriter)", ]) .with_memory_limit(200_000) .run() From 346a95ba841c4f6fead1d42406258dafc2b3288a Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 10 Jul 2024 13:42:40 -0700 Subject: [PATCH 2/2] fix(11397): terminate early on channel send failure --- .../src/datasource/file_format/parquet.rs | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0b02628325bb..6271d8af3786 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -893,8 +893,11 @@ async fn send_arrays_to_col_writers( let mut next_channel = 0; for (array, field) in rb.columns().iter().zip(schema.fields()) { for c in compute_leaves(field, array)? { - // Do not surface error from closed channel. - let _ = col_array_channels[next_channel].send(c).await; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if col_array_channels[next_channel].send(c).await.is_err() { + return Ok(()); + } next_channel += 1; } @@ -981,8 +984,11 @@ fn spawn_parquet_parallel_serialization_task( &pool, ); - // Do not surface error from closed channel. - let _ = serialize_tx.send(finalize_rg_task).await; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } current_rg_rows = 0; rb = rb.slice(rows_left, rb.num_rows() - rows_left); @@ -1007,8 +1013,11 @@ fn spawn_parquet_parallel_serialization_task( &pool, ); - // Do not surface any error here due to closed channel. - let _ = serialize_tx.send(finalize_rg_task).await; + // Do not surface error from closed channel (means something + // else hit an error, and the plan is shutting down). + if serialize_tx.send(finalize_rg_task).await.is_err() { + return Ok(()); + } } Ok(())