From 1fc776981496c0f0648ece8069992c65a7021c83 Mon Sep 17 00:00:00 2001 From: Arttu Date: Wed, 18 Dec 2024 17:05:23 +0100 Subject: [PATCH] fix: Ignore empty files in ListingTable when listing files with or without partition filters, as well as when inferring schema (#13750) * fix: Ignore empty files in ListingTable when listing files with or without partition filters, as well as when inferring schema * clippy * fix csv and json tests * add testing for parquet * cleanup * fix parquet tests * document describe_partition, add back repartition options to one of the csv empty files tests --- .../core/src/datasource/file_format/csv.rs | 38 ++---- .../core/src/datasource/file_format/json.rs | 8 +- .../src/datasource/file_format/parquet.rs | 57 ++++++++- .../core/src/datasource/listing/helpers.rs | 112 +++++++++++++++++- .../core/src/datasource/listing/table.rs | 2 + 5 files changed, 180 insertions(+), 37 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 9c96c682865f..e9a93475d3ce 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1259,18 +1259,13 @@ mod tests { Ok(()) } - /// Read a single empty csv file in parallel + /// Read a single empty csv file /// /// empty_0_byte.csv: /// (file is empty) - #[rstest(n_partitions, case(1), case(2), case(3), case(4))] #[tokio::test] - async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> { - let config = SessionConfig::new() - .with_repartition_file_scans(true) - .with_repartition_file_min_size(0) - .with_target_partitions(n_partitions); - let ctx = SessionContext::new_with_config(config); + async fn test_csv_empty_file() -> Result<()> { + let ctx = SessionContext::new(); ctx.register_csv( "empty", "tests/data/empty_0_byte.csv", @@ -1278,32 +1273,24 @@ mod tests { ) .await?; - // Require a predicate to enable repartition for the optimizer let query = "select * from empty where random() > 0.5;"; let query_result = ctx.sql(query).await?.collect().await?; - let actual_partitions = count_query_csv_partitions(&ctx, query).await?; #[rustfmt::skip] let expected = ["++", "++"]; assert_batches_eq!(expected, &query_result); - assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty Ok(()) } - /// Read a single empty csv file with header in parallel + /// Read a single empty csv file with header /// /// empty.csv: /// c1,c2,c3 - #[rstest(n_partitions, case(1), case(2), case(3))] #[tokio::test] - async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> Result<()> { - let config = SessionConfig::new() - .with_repartition_file_scans(true) - .with_repartition_file_min_size(0) - .with_target_partitions(n_partitions); - let ctx = SessionContext::new_with_config(config); + async fn test_csv_empty_with_header() -> Result<()> { + let ctx = SessionContext::new(); ctx.register_csv( "empty", "tests/data/empty.csv", @@ -1311,21 +1298,18 @@ mod tests { ) .await?; - // Require a predicate to enable repartition for the optimizer let query = "select * from empty where random() > 0.5;"; let query_result = ctx.sql(query).await?.collect().await?; - let actual_partitions = count_query_csv_partitions(&ctx, query).await?; #[rustfmt::skip] let expected = ["++", "++"]; assert_batches_eq!(expected, &query_result); - assert_eq!(n_partitions, actual_partitions); Ok(()) } - /// Read multiple empty csv files in parallel + /// Read multiple empty csv files /// /// all_empty /// ├── empty0.csv @@ -1334,13 +1318,13 @@ mod tests { /// /// empty0.csv/empty1.csv/empty2.csv: /// (file is empty) - #[rstest(n_partitions, case(1), case(2), case(3), case(4))] #[tokio::test] - async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> Result<()> { + async fn test_csv_multiple_empty_files() -> Result<()> { + // Testing that partitioning doesn't break with empty files let config = SessionConfig::new() .with_repartition_file_scans(true) .with_repartition_file_min_size(0) - .with_target_partitions(n_partitions); + .with_target_partitions(4); let ctx = SessionContext::new_with_config(config); let file_format = Arc::new(CsvFormat::default().with_has_header(false)); let listing_options = ListingOptions::new(file_format.clone()) @@ -1358,13 +1342,11 @@ mod tests { // Require a predicate to enable repartition for the optimizer let query = "select * from empty where random() > 0.5;"; let query_result = ctx.sql(query).await?.collect().await?; - let actual_partitions = count_query_csv_partitions(&ctx, query).await?; #[rustfmt::skip] let expected = ["++", "++"]; assert_batches_eq!(expected, &query_result); - assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty Ok(()) } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index e97853e9e7d7..4bdf336881c9 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -619,13 +619,11 @@ mod tests { Ok(()) } - #[rstest(n_partitions, case(1), case(2), case(3), case(4))] #[tokio::test] - async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> Result<()> { + async fn it_can_read_empty_ndjson() -> Result<()> { let config = SessionConfig::new() .with_repartition_file_scans(true) - .with_repartition_file_min_size(0) - .with_target_partitions(n_partitions); + .with_repartition_file_min_size(0); let ctx = SessionContext::new_with_config(config); @@ -638,7 +636,6 @@ mod tests { let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;"; let result = ctx.sql(query).await?.collect().await?; - let actual_partitions = count_num_partitions(&ctx, query).await?; #[rustfmt::skip] let expected = [ @@ -647,7 +644,6 @@ mod tests { ]; assert_batches_eq!(expected, &result); - assert_eq!(1, actual_partitions); Ok(()) } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 1d08de172273..383fd6575234 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1312,7 +1312,7 @@ mod tests { use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::physical_plan::metrics::MetricValue; - use crate::prelude::{SessionConfig, SessionContext}; + use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; use arrow_array::types::Int32Type; use arrow_array::{DictionaryArray, Int32Array, Int64Array}; @@ -1323,8 +1323,8 @@ mod tests { as_float64_array, as_int32_array, as_timestamp_nanosecond_array, }; use datafusion_common::config::ParquetOptions; - use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::Utf8; + use datafusion_common::{assert_batches_eq, ScalarValue}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -2251,6 +2251,59 @@ mod tests { scan_format(state, &*format, &testdata, file_name, projection, limit).await } + /// Test that 0-byte files don't break while reading + #[tokio::test] + async fn test_read_empty_parquet() -> Result<()> { + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy()); + File::create(&path).await?; + + let ctx = SessionContext::new(); + + let df = ctx + .read_parquet(&path, ParquetReadOptions::default()) + .await + .expect("read_parquet should succeed"); + + let result = df.collect().await?; + #[rustfmt::skip] + let expected = ["++", + "++"]; + assert_batches_eq!(expected, &result); + + Ok(()) + } + + /// Test that 0-byte files don't break while reading + #[tokio::test] + async fn test_read_partitioned_empty_parquet() -> Result<()> { + let tmp_dir = tempfile::TempDir::new().unwrap(); + let partition_dir = tmp_dir.path().join("col1=a"); + std::fs::create_dir(&partition_dir).unwrap(); + File::create(partition_dir.join("empty.parquet")) + .await + .unwrap(); + + let ctx = SessionContext::new(); + + let df = ctx + .read_parquet( + tmp_dir.path().to_str().unwrap(), + ParquetReadOptions::new() + .table_partition_cols(vec![("col1".to_string(), DataType::Utf8)]), + ) + .await + .expect("read_parquet should succeed"); + + let result = df.collect().await?; + #[rustfmt::skip] + let expected = ["++", + "++"]; + assert_batches_eq!(expected, &result); + + Ok(()) + } + fn build_ctx(store_url: &url::Url) -> Arc { let tmp_dir = tempfile::TempDir::new().unwrap(); let local = Arc::new( diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index a601aec32f16..228b9a4e9f6b 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -171,7 +171,13 @@ impl Partition { trace!("Listing partition {}", self.path); let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty()); let result = store.list_with_delimiter(prefix).await?; - self.files = Some(result.objects); + self.files = Some( + result + .objects + .into_iter() + .filter(|object_meta| object_meta.size > 0) + .collect(), + ); Ok((self, result.common_prefixes)) } } @@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>( table_path .list_all_files(ctx, store, file_extension) .await? + .try_filter(|object_meta| futures::future::ready(object_meta.size > 0)) .map_ok(|object_meta| object_meta.into()), )); } @@ -566,6 +573,7 @@ mod tests { async fn test_pruned_partition_list_empty() { let (store, state) = make_test_store_and_state(&[ ("tablepath/mypartition=val1/notparquetfile", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), ("tablepath/file.parquet", 100), ]); let filter = Expr::eq(col("mypartition"), lit("val1")); @@ -590,6 +598,7 @@ mod tests { let (store, state) = make_test_store_and_state(&[ ("tablepath/mypartition=val1/file.parquet", 100), ("tablepath/mypartition=val2/file.parquet", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), ("tablepath/mypartition=val1/other=val3/file.parquet", 100), ]); let filter = Expr::eq(col("mypartition"), lit("val1")); @@ -671,6 +680,107 @@ mod tests { ); } + /// Describe a partition as a (path, depth, files) tuple for easier assertions + fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { + ( + partition.path.as_ref(), + partition.depth, + partition + .files + .as_ref() + .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect()) + .unwrap_or_default(), + ) + } + + #[tokio::test] + async fn test_list_partition() { + let (store, _) = make_test_store_and_state(&[ + ("tablepath/part1=p1v1/file.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), + ]); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 0, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec![]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 1, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 2, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ( + "tablepath/part1=p1v2/part2=p2v1", + 2, + vec!["file1.parquet", "file2.parquet"] + ), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), + ] + ); + } + #[test] fn test_parse_partitions_for_path() { assert_eq!( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index b12f37ed7531..791b15704d09 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -470,6 +470,8 @@ impl ListingOptions { let files: Vec<_> = table_path .list_all_files(state, store.as_ref(), &self.file_extension) .await? + // Empty files cannot affect schema but may throw when trying to read for it + .try_filter(|object_meta| future::ready(object_meta.size > 0)) .try_collect() .await?;