diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 009164a29e34..a14cbdecf601 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -24,10 +24,11 @@ use crate::{ use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::{ - metadata::KeyValue, - properties::{EnabledStatistics, WriterProperties, WriterVersion}, + file::properties::{ + EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, + DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, }, + format::KeyValue, schema::types::ColumnPath, }; @@ -52,92 +53,43 @@ impl ParquetWriterOptions { impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { type Error = DataFusionError; - fn try_from(parquet_options: &TableParquetOptions) -> Result { - let ParquetOptions { - data_pagesize_limit, - write_batch_size, - writer_version, - dictionary_page_size_limit, - max_row_group_size, - created_by, - column_index_truncate_length, - data_page_row_count_limit, - bloom_filter_on_write, - encoding, - dictionary_enabled, - compression, - statistics_enabled, - max_statistics_size, - bloom_filter_fpp, - bloom_filter_ndv, - // below is not part of ParquetWriterOptions - enable_page_index: _, - pruning: _, - skip_metadata: _, - metadata_size_hint: _, - pushdown_filters: _, - reorder_filters: _, - allow_single_file_parallelism: _, - maximum_parallel_row_group_writers: _, - maximum_buffered_record_batches_per_stream: _, - bloom_filter_on_read: _, - } = &parquet_options.global; - - let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() { - Some( - parquet_options - .key_value_metadata - .clone() - .drain() - .map(|(key, value)| KeyValue { key, value }) - .collect::>(), - ) - } else { - None - }; - - let mut builder = WriterProperties::builder() - .set_data_page_size_limit(*data_pagesize_limit) - .set_write_batch_size(*write_batch_size) - .set_writer_version(parse_version_string(writer_version.as_str())?) - .set_dictionary_page_size_limit(*dictionary_page_size_limit) - .set_max_row_group_size(*max_row_group_size) - .set_created_by(created_by.clone()) - .set_column_index_truncate_length(*column_index_truncate_length) - .set_data_page_row_count_limit(*data_page_row_count_limit) - .set_bloom_filter_enabled(*bloom_filter_on_write) - .set_key_value_metadata(key_value_metadata); - - if let Some(encoding) = &encoding { - builder = builder.set_encoding(parse_encoding_string(encoding)?); - } - - if let Some(enabled) = dictionary_enabled { - builder = builder.set_dictionary_enabled(*enabled); - } - - if let Some(compression) = &compression { - builder = builder.set_compression(parse_compression_string(compression)?); - } - - if let Some(statistics) = &statistics_enabled { - builder = - builder.set_statistics_enabled(parse_statistics_string(statistics)?); - } - - if let Some(size) = max_statistics_size { - builder = builder.set_max_statistics_size(*size); - } + fn try_from(parquet_table_options: &TableParquetOptions) -> Result { + // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) + Ok(ParquetWriterOptions { + writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)? + .build(), + }) + } +} - if let Some(fpp) = bloom_filter_fpp { - builder = builder.set_bloom_filter_fpp(*fpp); - } +impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { + type Error = DataFusionError; - if let Some(ndv) = bloom_filter_ndv { - builder = builder.set_bloom_filter_ndv(*ndv); + /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + fn try_from(table_parquet_options: &TableParquetOptions) -> Result { + // Table options include kv_metadata and col-specific options + let TableParquetOptions { + global, + column_specific_options, + key_value_metadata, + } = table_parquet_options; + + let mut builder = global.into_writer_properties_builder()?; + + if !key_value_metadata.is_empty() { + builder = builder.set_key_value_metadata(Some( + key_value_metadata + .to_owned() + .drain() + .map(|(key, value)| KeyValue { key, value }) + .collect(), + )); } - for (column, options) in &parquet_options.column_specific_options { + // Apply column-specific options: + for (column, options) in column_specific_options { let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect()); if let Some(bloom_filter_enabled) = options.bloom_filter_enabled { @@ -183,10 +135,87 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { } } - // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) - Ok(ParquetWriterOptions { - writer_options: builder.build(), - }) + Ok(builder) + } +} + +impl ParquetOptions { + /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options + /// applied per column; a customization which is not applicable for [`ParquetOptions`]. + pub fn into_writer_properties_builder(&self) -> Result { + let ParquetOptions { + data_pagesize_limit, + write_batch_size, + writer_version, + compression, + dictionary_enabled, + dictionary_page_size_limit, + statistics_enabled, + max_statistics_size, + max_row_group_size, + created_by, + column_index_truncate_length, + data_page_row_count_limit, + encoding, + bloom_filter_on_write, + bloom_filter_fpp, + bloom_filter_ndv, + + // not in WriterProperties + enable_page_index: _, + pruning: _, + skip_metadata: _, + metadata_size_hint: _, + pushdown_filters: _, + reorder_filters: _, + allow_single_file_parallelism: _, + maximum_parallel_row_group_writers: _, + maximum_buffered_record_batches_per_stream: _, + bloom_filter_on_read: _, // reads not used for writer props + } = self; + + let mut builder = WriterProperties::builder() + .set_data_page_size_limit(*data_pagesize_limit) + .set_write_batch_size(*write_batch_size) + .set_writer_version(parse_version_string(writer_version.as_str())?) + .set_dictionary_page_size_limit(*dictionary_page_size_limit) + .set_statistics_enabled( + statistics_enabled + .as_ref() + .and_then(|s| parse_statistics_string(s).ok()) + .unwrap_or(DEFAULT_STATISTICS_ENABLED), + ) + .set_max_statistics_size( + max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE), + ) + .set_max_row_group_size(*max_row_group_size) + .set_created_by(created_by.clone()) + .set_column_index_truncate_length(*column_index_truncate_length) + .set_data_page_row_count_limit(*data_page_row_count_limit) + .set_bloom_filter_enabled(*bloom_filter_on_write); + + if let Some(bloom_filter_fpp) = bloom_filter_fpp { + builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp); + }; + if let Some(bloom_filter_ndv) = bloom_filter_ndv { + builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv); + }; + if let Some(dictionary_enabled) = dictionary_enabled { + builder = builder.set_dictionary_enabled(*dictionary_enabled); + }; + + // We do not have access to default ColumnProperties set in Arrow. + // Therefore, only overwrite if these settings exist. + if let Some(compression) = compression { + builder = builder.set_compression(parse_compression_string(compression)?); + } + if let Some(encoding) = encoding { + builder = builder.set_encoding(parse_encoding_string(encoding)?); + } + + Ok(builder) } } @@ -336,3 +365,205 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result ColumnOptions { + ColumnOptions { + compression: Some("zstd(22)".into()), + dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v), + statistics_enabled: Some("page".into()), + max_statistics_size: Some(72), + encoding: Some("RLE".into()), + bloom_filter_enabled: Some(true), + bloom_filter_fpp: Some(0.72), + bloom_filter_ndv: Some(72), + } + } + + fn parquet_options_with_non_defaults() -> ParquetOptions { + let defaults = ParquetOptions::default(); + let writer_version = if defaults.writer_version.eq("1.0") { + "2.0" + } else { + "1.0" + }; + + ParquetOptions { + data_pagesize_limit: 42, + write_batch_size: 42, + writer_version: writer_version.into(), + compression: Some("zstd(22)".into()), + dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)), + dictionary_page_size_limit: 42, + statistics_enabled: Some("chunk".into()), + max_statistics_size: Some(42), + max_row_group_size: 42, + created_by: "wordy".into(), + column_index_truncate_length: Some(42), + data_page_row_count_limit: 42, + encoding: Some("BYTE_STREAM_SPLIT".into()), + bloom_filter_on_write: !defaults.bloom_filter_on_write, + bloom_filter_fpp: Some(0.42), + bloom_filter_ndv: Some(42), + + // not in WriterProperties, but itemizing here to not skip newly added props + enable_page_index: defaults.enable_page_index, + pruning: defaults.pruning, + skip_metadata: defaults.skip_metadata, + metadata_size_hint: defaults.metadata_size_hint, + pushdown_filters: defaults.pushdown_filters, + reorder_filters: defaults.reorder_filters, + allow_single_file_parallelism: defaults.allow_single_file_parallelism, + maximum_parallel_row_group_writers: defaults + .maximum_parallel_row_group_writers, + maximum_buffered_record_batches_per_stream: defaults + .maximum_buffered_record_batches_per_stream, + bloom_filter_on_read: defaults.bloom_filter_on_read, + } + } + + fn extract_column_options( + props: &WriterProperties, + col: ColumnPath, + ) -> ColumnOptions { + let bloom_filter_default_props = props.bloom_filter_properties(&col); + + ColumnOptions { + bloom_filter_enabled: Some(bloom_filter_default_props.is_some()), + encoding: props.encoding(&col).map(|s| s.to_string()), + dictionary_enabled: Some(props.dictionary_enabled(&col)), + compression: match props.compression(&col) { + Compression::ZSTD(lvl) => { + Some(format!("zstd({})", lvl.compression_level())) + } + _ => None, + }, + statistics_enabled: Some( + match props.statistics_enabled(&col) { + EnabledStatistics::None => "none", + EnabledStatistics::Chunk => "chunk", + EnabledStatistics::Page => "page", + } + .into(), + ), + bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp), + bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv), + max_statistics_size: Some(props.max_statistics_size(&col)), + } + } + + /// For testing only, take a single write's props and convert back into the session config. + /// (use identity to confirm correct.) + fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions { + let default_col = ColumnPath::from("col doesn't have specific config"); + let default_col_props = extract_column_options(props, default_col); + + let configured_col = ColumnPath::from(COL_NAME); + let configured_col_props = extract_column_options(props, configured_col); + + let key_value_metadata = props + .key_value_metadata() + .map(|pairs| { + HashMap::from_iter( + pairs + .iter() + .cloned() + .map(|KeyValue { key, value }| (key, value)), + ) + }) + .unwrap_or_default(); + + let global_options_defaults = ParquetOptions::default(); + + let column_specific_options = if configured_col_props.eq(&default_col_props) { + HashMap::default() + } else { + HashMap::from([(COL_NAME.into(), configured_col_props)]) + }; + + TableParquetOptions { + global: ParquetOptions { + // global options + data_pagesize_limit: props.dictionary_page_size_limit(), + write_batch_size: props.write_batch_size(), + writer_version: format!("{}.0", props.writer_version().as_num()), + dictionary_page_size_limit: props.dictionary_page_size_limit(), + max_row_group_size: props.max_row_group_size(), + created_by: props.created_by().to_string(), + column_index_truncate_length: props.column_index_truncate_length(), + data_page_row_count_limit: props.data_page_row_count_limit(), + + // global options which set the default column props + encoding: default_col_props.encoding, + compression: default_col_props.compression, + dictionary_enabled: default_col_props.dictionary_enabled, + statistics_enabled: default_col_props.statistics_enabled, + max_statistics_size: default_col_props.max_statistics_size, + bloom_filter_on_write: default_col_props + .bloom_filter_enabled + .unwrap_or_default(), + bloom_filter_fpp: default_col_props.bloom_filter_fpp, + bloom_filter_ndv: default_col_props.bloom_filter_ndv, + + // not in WriterProperties + enable_page_index: global_options_defaults.enable_page_index, + pruning: global_options_defaults.pruning, + skip_metadata: global_options_defaults.skip_metadata, + metadata_size_hint: global_options_defaults.metadata_size_hint, + pushdown_filters: global_options_defaults.pushdown_filters, + reorder_filters: global_options_defaults.reorder_filters, + allow_single_file_parallelism: global_options_defaults + .allow_single_file_parallelism, + maximum_parallel_row_group_writers: global_options_defaults + .maximum_parallel_row_group_writers, + maximum_buffered_record_batches_per_stream: global_options_defaults + .maximum_buffered_record_batches_per_stream, + bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, + }, + column_specific_options, + key_value_metadata, + } + } + + #[test] + fn table_parquet_opts_to_writer_props() { + // ParquetOptions, all props set to non-default + let parquet_options = parquet_options_with_non_defaults(); + + // TableParquetOptions, using ParquetOptions for global settings + let key = "foo".to_string(); + let value = Some("bar".into()); + let table_parquet_opts = TableParquetOptions { + global: parquet_options.clone(), + column_specific_options: [( + COL_NAME.into(), + column_options_with_non_defaults(&parquet_options), + )] + .into(), + key_value_metadata: [(key.clone(), value.clone())].into(), + }; + + let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) + .unwrap() + .build(); + assert_eq!( + table_parquet_opts, + session_config_from_writer_props(&writer_props), + "the writer_props should have the same configuration as the session's TableParquetOptions", + ); + } +}