Skip to content

Commit

Permalink
fix: Ignore empty files in ListingTable when listing files with or wi…
Browse files Browse the repository at this point in the history
…thout partition filters, as well as when inferring schema (#13750)

* fix: Ignore empty files in ListingTable when listing files with or without partition filters, as well as when inferring schema

* clippy

* fix csv and json tests

* add testing for parquet

* cleanup

* fix parquet tests

* document describe_partition, add back repartition options to one of the csv empty files tests
  • Loading branch information
Blizzara authored Dec 18, 2024
1 parent 82a40f3 commit 1fc7769
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 37 deletions.
38 changes: 10 additions & 28 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,73 +1259,57 @@ mod tests {
Ok(())
}

/// Read a single empty csv file in parallel
/// Read a single empty csv file
///
/// empty_0_byte.csv:
/// (file is empty)
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::new_with_config(config);
async fn test_csv_empty_file() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"empty",
"tests/data/empty_0_byte.csv",
CsvReadOptions::new().has_header(false),
)
.await?;

// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty

Ok(())
}

/// Read a single empty csv file with header in parallel
/// Read a single empty csv file with header
///
/// empty.csv:
/// c1,c2,c3
#[rstest(n_partitions, case(1), case(2), case(3))]
#[tokio::test]
async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::new_with_config(config);
async fn test_csv_empty_with_header() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"empty",
"tests/data/empty.csv",
CsvReadOptions::new().has_header(true),
)
.await?;

// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(n_partitions, actual_partitions);

Ok(())
}

/// Read multiple empty csv files in parallel
/// Read multiple empty csv files
///
/// all_empty
/// ├── empty0.csv
Expand All @@ -1334,13 +1318,13 @@ mod tests {
///
/// empty0.csv/empty1.csv/empty2.csv:
/// (file is empty)
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> Result<()> {
async fn test_csv_multiple_empty_files() -> Result<()> {
// Testing that partitioning doesn't break with empty files
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
.with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
let file_format = Arc::new(CsvFormat::default().with_has_header(false));
let listing_options = ListingOptions::new(file_format.clone())
Expand All @@ -1358,13 +1342,11 @@ mod tests {
// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty

Ok(())
}
Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,11 @@ mod tests {
Ok(())
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> Result<()> {
async fn it_can_read_empty_ndjson() -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
.with_repartition_file_min_size(0);

let ctx = SessionContext::new_with_config(config);

Expand All @@ -638,7 +636,6 @@ mod tests {
let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";

let result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_num_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = [
Expand All @@ -647,7 +644,6 @@ mod tests {
];

assert_batches_eq!(expected, &result);
assert_eq!(1, actual_partitions);

Ok(())
}
Expand Down
57 changes: 55 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ mod tests {

use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_array::types::Int32Type;
use arrow_array::{DictionaryArray, Int32Array, Int64Array};
Expand All @@ -1323,8 +1323,8 @@ mod tests {
as_float64_array, as_int32_array, as_timestamp_nanosecond_array,
};
use datafusion_common::config::ParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::Utf8;
use datafusion_common::{assert_batches_eq, ScalarValue};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -2251,6 +2251,59 @@ mod tests {
scan_format(state, &*format, &testdata, file_name, projection, limit).await
}

/// Test that 0-byte files don't break while reading
#[tokio::test]
async fn test_read_empty_parquet() -> Result<()> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy());
File::create(&path).await?;

let ctx = SessionContext::new();

let df = ctx
.read_parquet(&path, ParquetReadOptions::default())
.await
.expect("read_parquet should succeed");

let result = df.collect().await?;
#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &result);

Ok(())
}

/// Test that 0-byte files don't break while reading
#[tokio::test]
async fn test_read_partitioned_empty_parquet() -> Result<()> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let partition_dir = tmp_dir.path().join("col1=a");
std::fs::create_dir(&partition_dir).unwrap();
File::create(partition_dir.join("empty.parquet"))
.await
.unwrap();

let ctx = SessionContext::new();

let df = ctx
.read_parquet(
tmp_dir.path().to_str().unwrap(),
ParquetReadOptions::new()
.table_partition_cols(vec![("col1".to_string(), DataType::Utf8)]),
)
.await
.expect("read_parquet should succeed");

let result = df.collect().await?;
#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &result);

Ok(())
}

fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let local = Arc::new(
Expand Down
112 changes: 111 additions & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,13 @@ impl Partition {
trace!("Listing partition {}", self.path);
let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
let result = store.list_with_delimiter(prefix).await?;
self.files = Some(result.objects);
self.files = Some(
result
.objects
.into_iter()
.filter(|object_meta| object_meta.size > 0)
.collect(),
);
Ok((self, result.common_prefixes))
}
}
Expand Down Expand Up @@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>(
table_path
.list_all_files(ctx, store, file_extension)
.await?
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0))
.map_ok(|object_meta| object_meta.into()),
));
}
Expand Down Expand Up @@ -566,6 +573,7 @@ mod tests {
async fn test_pruned_partition_list_empty() {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/notparquetfile", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
Expand All @@ -590,6 +598,7 @@ mod tests {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
Expand Down Expand Up @@ -671,6 +680,107 @@ mod tests {
);
}

/// Describe a partition as a (path, depth, files) tuple for easier assertions
fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
(
partition.path.as_ref(),
partition.depth,
partition
.files
.as_ref()
.map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
.unwrap_or_default(),
)
}

#[tokio::test]
async fn test_list_partition() {
let (store, _) = make_test_store_and_state(&[
("tablepath/part1=p1v1/file.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100),
("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0),
]);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
0,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec![]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
]
);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
1,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v2/part2=p2v1", 2, vec![]),
("tablepath/part1=p1v2/part2=p2v2", 2, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
("tablepath/part1=p1v3/part2=p2v1", 2, vec![]),
]
);

let partitions = list_partitions(
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
2,
None,
)
.await
.expect("listing partitions failed");

assert_eq!(
&partitions
.iter()
.map(describe_partition)
.collect::<Vec<_>>(),
&vec![
("tablepath", 0, vec![]),
("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
("tablepath/part1=p1v2", 1, vec![]),
("tablepath/part1=p1v3", 1, vec![]),
(
"tablepath/part1=p1v2/part2=p2v1",
2,
vec!["file1.parquet", "file2.parquet"]
),
("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]),
("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]),
]
);
}

#[test]
fn test_parse_partitions_for_path() {
assert_eq!(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ impl ListingOptions {
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
// Empty files cannot affect schema but may throw when trying to read for it
.try_filter(|object_meta| future::ready(object_meta.size > 0))
.try_collect()
.await?;

Expand Down

0 comments on commit 1fc7769

Please sign in to comment.