diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index ca2ac991e67d..c04c536e7ca6 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -46,7 +46,9 @@ use crate::physical_plan::{ use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; +use datafusion_common::{ + exec_err, internal_datafusion_err, not_impl_err, DataFusionError, FileType, +}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; @@ -66,6 +68,7 @@ use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::file::writer::SerializedFileWriter; +use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; @@ -537,6 +540,9 @@ pub struct ParquetSink { config: FileSinkConfig, /// parquet_options: TableParquetOptions, + /// File metadata from successfully produced parquet files. The Mutex is only used + /// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all. + written: Arc>>, } impl Debug for ParquetSink { @@ -563,6 +569,7 @@ impl ParquetSink { Self { config, parquet_options, + written: Default::default(), } } @@ -570,6 +577,13 @@ impl ParquetSink { pub fn config(&self) -> &FileSinkConfig { &self.config } + + /// Retrieve the file metadata for the written files, keyed to the path + /// which may be partitioned (in the case of hive style partitioning). + pub fn written(&self) -> HashMap { + self.written.lock().clone() + } + /// Converts table schema to writer schema, which may differ in the case /// of hive style partitioning where some columns are removed from the /// underlying files. @@ -673,8 +687,10 @@ impl DataSink for ParquetSink { "parquet".into(), ); - let mut file_write_tasks: JoinSet> = - JoinSet::new(); + let mut file_write_tasks: JoinSet< + std::result::Result<(Path, FileMetaData), DataFusionError>, + > = JoinSet::new(); + while let Some((path, mut rx)) = file_stream_rx.recv().await { if !allow_single_file_parallelism { let mut writer = self @@ -685,13 +701,14 @@ impl DataSink for ParquetSink { ) .await?; file_write_tasks.spawn(async move { - let mut row_count = 0; while let Some(batch) = rx.recv().await { - row_count += batch.num_rows(); writer.write(&batch).await?; } - writer.close().await?; - Ok(row_count) + let file_metadata = writer + .close() + .await + .map_err(DataFusionError::ParquetError)?; + Ok((path, file_metadata)) }); } else { let writer = create_writer( @@ -706,14 +723,15 @@ impl DataSink for ParquetSink { let props = parquet_props.clone(); let parallel_options_clone = parallel_options.clone(); file_write_tasks.spawn(async move { - output_single_parquet_file_parallelized( + let file_metadata = output_single_parquet_file_parallelized( writer, rx, schema, props.writer_options(), parallel_options_clone, ) - .await + .await?; + Ok((path, file_metadata)) }); } } @@ -722,7 +740,13 @@ impl DataSink for ParquetSink { while let Some(result) = file_write_tasks.join_next().await { match result { Ok(r) => { - row_count += r?; + let (path, file_metadata) = r?; + row_count += file_metadata.num_rows; + let mut written_files = self.written.lock(); + written_files + .try_insert(path.clone(), file_metadata) + .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?; + drop(written_files); } Err(e) => { if e.is_panic() { @@ -924,7 +948,7 @@ async fn concatenate_parallel_row_groups( schema: Arc, writer_props: Arc, mut object_store_writer: AbortableWrite>, -) -> Result { +) -> Result { let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); let schema_desc = arrow_to_parquet_schema(schema.as_ref())?; @@ -934,13 +958,10 @@ async fn concatenate_parallel_row_groups( writer_props, )?; - let mut row_count = 0; - while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; let mut rg_out = parquet_writer.next_row_group()?; - let (serialized_columns, cnt) = result?; - row_count += cnt; + let (serialized_columns, _cnt) = result?; for chunk in serialized_columns { chunk.append_to_row_group(&mut rg_out)?; let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); @@ -954,13 +975,13 @@ async fn concatenate_parallel_row_groups( rg_out.close()?; } - let inner_writer = parquet_writer.into_inner()?; - let final_buff = inner_writer.buffer.try_lock().unwrap(); + let file_metadata = parquet_writer.close()?; + let final_buff = merged_buff.buffer.try_lock().unwrap(); object_store_writer.write_all(final_buff.as_slice()).await?; object_store_writer.shutdown().await?; - Ok(row_count) + Ok(file_metadata) } /// Parallelizes the serialization of a single parquet file, by first serializing N @@ -973,7 +994,7 @@ async fn output_single_parquet_file_parallelized( output_schema: Arc, parquet_props: &WriterProperties, parallel_options: ParallelParquetWriterOptions, -) -> Result { +) -> Result { let max_rowgroups = parallel_options.max_parallel_row_groups; // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel let (serialize_tx, serialize_rx) = @@ -987,7 +1008,7 @@ async fn output_single_parquet_file_parallelized( arc_props.clone(), parallel_options, ); - let row_count = concatenate_parallel_row_groups( + let file_metadata = concatenate_parallel_row_groups( serialize_rx, output_schema.clone(), arc_props.clone(), @@ -996,7 +1017,7 @@ async fn output_single_parquet_file_parallelized( .await?; launch_serialization_task.join_unwind().await?; - Ok(row_count) + Ok(file_metadata) } #[cfg(test)] @@ -1084,6 +1105,7 @@ pub(crate) mod test_util { #[cfg(test)] mod tests { use super::super::test_util::scan_format; + use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; use crate::physical_plan::collect; use std::fmt::{Display, Formatter}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -1095,13 +1117,19 @@ mod tests { use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; use arrow::record_batch::RecordBatch; + use arrow_schema::Field; use async_trait::async_trait; use bytes::Bytes; use datafusion_common::cast::{ as_binary_array, as_boolean_array, as_float32_array, as_float64_array, as_int32_array, as_timestamp_nanosecond_array, }; + use datafusion_common::config::ParquetOptions; + use datafusion_common::config::TableParquetOptions; use datafusion_common::ScalarValue; + use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use futures::stream::BoxStream; use futures::StreamExt; use log::error; @@ -1796,4 +1824,183 @@ mod tests { let format = ParquetFormat::default(); scan_format(state, &format, &testdata, file_name, projection, limit).await } + + fn build_ctx(store_url: &url::Url) -> Arc { + let tmp_dir = tempfile::TempDir::new().unwrap(); + let local = Arc::new( + LocalFileSystem::new_with_prefix(&tmp_dir) + .expect("should create object store"), + ); + + let mut session = SessionConfig::default(); + let mut parquet_opts = ParquetOptions { + allow_single_file_parallelism: true, + ..Default::default() + }; + parquet_opts.allow_single_file_parallelism = true; + session.options_mut().execution.parquet = parquet_opts; + + let runtime = RuntimeEnv::default(); + runtime + .object_store_registry + .register_store(store_url, local); + + Arc::new( + TaskContext::default() + .with_session_config(session) + .with_runtime(Arc::new(runtime)), + ) + } + + #[tokio::test] + async fn parquet_sink_write() -> Result<()> { + let field_a = Field::new("a", DataType::Utf8, false); + let field_b = Field::new("b", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let object_store_url = ObjectStoreUrl::local_filesystem(); + + let file_sink_config = FileSinkConfig { + object_store_url: object_store_url.clone(), + file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], + table_paths: vec![ListingTableUrl::parse("file:///")?], + output_schema: schema.clone(), + table_partition_cols: vec![], + overwrite: true, + }; + let parquet_sink = Arc::new(ParquetSink::new( + file_sink_config, + TableParquetOptions::default(), + )); + + // create data + let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); + let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + + // write stream + parquet_sink + .write_all( + Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(vec![Ok(batch)]), + )), + &build_ctx(object_store_url.as_ref()), + ) + .await + .unwrap(); + + // assert written + let mut written = parquet_sink.written(); + let written = written.drain(); + assert_eq!( + written.len(), + 1, + "expected a single parquet files to be written, instead found {}", + written.len() + ); + + // check the file metadata + let ( + path, + FileMetaData { + num_rows, schema, .. + }, + ) = written.take(1).next().unwrap(); + let path_parts = path.parts().collect::>(); + assert_eq!(path_parts.len(), 1, "should not have path prefix"); + + assert_eq!(num_rows, 2, "file metdata to have 2 rows"); + assert!( + schema.iter().any(|col_schema| col_schema.name == "a"), + "output file metadata should contain col a" + ); + assert!( + schema.iter().any(|col_schema| col_schema.name == "b"), + "output file metadata should contain col b" + ); + + Ok(()) + } + + #[tokio::test] + async fn parquet_sink_write_partitions() -> Result<()> { + let field_a = Field::new("a", DataType::Utf8, false); + let field_b = Field::new("b", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let object_store_url = ObjectStoreUrl::local_filesystem(); + + // set file config to include partitioning on field_a + let file_sink_config = FileSinkConfig { + object_store_url: object_store_url.clone(), + file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], + table_paths: vec![ListingTableUrl::parse("file:///")?], + output_schema: schema.clone(), + table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning + overwrite: true, + }; + let parquet_sink = Arc::new(ParquetSink::new( + file_sink_config, + TableParquetOptions::default(), + )); + + // create data with 2 partitions + let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); + let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + + // write stream + parquet_sink + .write_all( + Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(vec![Ok(batch)]), + )), + &build_ctx(object_store_url.as_ref()), + ) + .await + .unwrap(); + + // assert written + let mut written = parquet_sink.written(); + let written = written.drain(); + assert_eq!( + written.len(), + 2, + "expected two parquet files to be written, instead found {}", + written.len() + ); + + // check the file metadata includes partitions + let mut expected_partitions = std::collections::HashSet::from(["a=foo", "a=bar"]); + for ( + path, + FileMetaData { + num_rows, schema, .. + }, + ) in written.take(2) + { + let path_parts = path.parts().collect::>(); + assert_eq!(path_parts.len(), 2, "should have path prefix"); + + let prefix = path_parts[0].as_ref(); + assert!( + expected_partitions.contains(prefix), + "expected path prefix to match partition, instead found {:?}", + prefix + ); + expected_partitions.remove(prefix); + + assert_eq!(num_rows, 1, "file metdata to have 1 row"); + assert!( + !schema.iter().any(|col_schema| col_schema.name == "a"), + "output file metadata will not contain partitioned col a" + ); + assert!( + schema.iter().any(|col_schema| col_schema.name == "b"), + "output file metadata should contain col b" + ); + } + + Ok(()) + } }