Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: changes to upstream DF, in order to enable parallelized writes with ParquetSink #11

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,9 @@ pub struct TableParquetOptions {
pub global: ParquetOptions,
/// Column specific options. Default usage is parquet.XX::column.
pub column_specific_options: HashMap<String, ColumnOptions>,
/// Additional metadata to be inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
pub key_value_metadata: HashMap<String, Option<String>>,
}

impl ConfigField for TableParquetOptions {
Expand All @@ -1381,6 +1384,20 @@ impl ConfigField for TableParquetOptions {
// Determine the key if it's a global or column-specific setting
if key.contains("::") {
self.column_specific_options.set(key, value)
} else if key.eq("metadata") {
for maybe_pair in value.split('_') {
let (k, v) = match maybe_pair.split(':').collect::<Vec<_>>()[..] {
[k, v] => (k.into(), Some(v.into())),
[k] => (k.into(), None),
_ => {
return Err(DataFusionError::Configuration(format!(
"Invalid metadata provided \"{maybe_pair}\""
)))
}
};
self.key_value_metadata.insert(k, v);
}
Ok(())
} else {
self.global.set(key, value)
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ mod tests {
123
);

// properties which remain as default on WriterProperties
assert_eq!(properties.key_value_metadata(), None);
assert_eq!(properties.sorting_columns(), None);

Ok(())
}

Expand Down
104 changes: 73 additions & 31 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

//! Options related to how parquet files should be written

use crate::{config::TableParquetOptions, DataFusionError, Result};
use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
};

use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
file::{
metadata::KeyValue,
properties::{EnabledStatistics, WriterProperties, WriterVersion},
},
schema::types::ColumnPath,
};

Expand All @@ -47,53 +53,87 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
let parquet_session_options = &parquet_options.global;
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
.set_write_batch_size(parquet_session_options.write_batch_size)
.set_writer_version(parse_version_string(
&parquet_session_options.writer_version,
)?)
.set_dictionary_page_size_limit(
parquet_session_options.dictionary_page_size_limit,
)
.set_max_row_group_size(parquet_session_options.max_row_group_size)
.set_created_by(parquet_session_options.created_by.clone())
.set_column_index_truncate_length(
parquet_session_options.column_index_truncate_length,
let ParquetOptions {
Copy link
Collaborator Author

@wiedld wiedld Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ParquetOptions are the configuration which can be provided within a SQL query, and therefore are intended for use in an easily parsible format (refer to the ConfigField trait and associated macros in the linked file).

The sorting_columns may lend itself to this use case, of being provided within a SQL query and being easier to parse. However, the same is not true for the user-provided kv_metadata.

Copy link
Collaborator Author

@wiedld wiedld Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question regarding the use case for the WriterProperties sorting_columns. It's listed in the parquet interface; is this referring to a per-row-group applied sorting that only occurs on write? Is there a use case for datafusion, given that we already sort earlier in the batch stream?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question regarding the use case for the WriterProperties sorting_columns. It's listed in the parquet interface;

In theory it is supposed to be used to let readers infer information from the file. I don't know how widely it is written or used by other parquet readers/writers.

IOx stores its sort information in its own metadata, so I think setting the fields in the parquet metadata could be a separate project

data_pagesize_limit,
write_batch_size,
writer_version,
dictionary_page_size_limit,
max_row_group_size,
created_by,
column_index_truncate_length,
data_page_row_count_limit,
bloom_filter_enabled,
encoding,
dictionary_enabled,
compression,
statistics_enabled,
max_statistics_size,
bloom_filter_fpp,
bloom_filter_ndv,
// below is not part of ParquetWriterOptions
enable_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice change

} = &parquet_options.global;

let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {
Some(
parquet_options
.key_value_metadata
.clone()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect::<Vec<_>>(),
)
.set_data_page_row_count_limit(
parquet_session_options.data_page_row_count_limit,
)
.set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled);
} else {
None
};

if let Some(encoding) = &parquet_session_options.encoding {
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_enabled)
.set_key_value_metadata(key_value_metadata);

if let Some(encoding) = &encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}

if let Some(enabled) = parquet_session_options.dictionary_enabled {
builder = builder.set_dictionary_enabled(enabled);
if let Some(enabled) = dictionary_enabled {
builder = builder.set_dictionary_enabled(*enabled);
}

if let Some(compression) = &parquet_session_options.compression {
if let Some(compression) = &compression {
builder = builder.set_compression(parse_compression_string(compression)?);
}

if let Some(statistics) = &parquet_session_options.statistics_enabled {
if let Some(statistics) = &statistics_enabled {
builder =
builder.set_statistics_enabled(parse_statistics_string(statistics)?);
}

if let Some(size) = parquet_session_options.max_statistics_size {
builder = builder.set_max_statistics_size(size);
if let Some(size) = max_statistics_size {
builder = builder.set_max_statistics_size(*size);
}

if let Some(fpp) = parquet_session_options.bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(fpp);
if let Some(fpp) = bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(*fpp);
}

if let Some(ndv) = parquet_session_options.bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(ndv);
if let Some(ndv) = bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(*ndv);
}

for (column, options) in &parquet_options.column_specific_options {
Expand Down Expand Up @@ -141,6 +181,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
builder.set_column_max_statistics_size(path, max_statistics_size);
}
}

// ParquetWriterOptions will have defaults for the remaining fields (e.g. key_value_metadata & sorting_columns)
Ok(ParquetWriterOptions {
writer_options: builder.build(),
})
Expand Down
20 changes: 18 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1865,7 +1865,13 @@ mod tests {
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
TableParquetOptions {
key_value_metadata: std::collections::HashMap::from([(
"my-data".to_string(),
Some("stuff".to_string()),
)]),
..Default::default()
},
));

// create data
Expand Down Expand Up @@ -1899,7 +1905,10 @@ mod tests {
let (
path,
FileMetaData {
num_rows, schema, ..
num_rows,
schema,
key_value_metadata,
..
},
) = written.take(1).next().unwrap();
let path_parts = path.parts().collect::<Vec<_>>();
Expand All @@ -1915,6 +1924,13 @@ mod tests {
"output file metadata should contain col b"
);

let key_value_metadata = key_value_metadata.unwrap();
let my_metadata = key_value_metadata
.iter()
.filter(|kv| kv.key == "my-data")
.collect::<Vec<_>>();
assert_eq!(my_metadata.len(), 1);

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
.unwrap()
.unwrap(),
column_specific_options,
key_value_metadata: Default::default(),
})
}
}
Expand Down
29 changes: 28 additions & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,38 @@ OPTIONS (
'format.statistics_enabled::col2' none,
'format.max_statistics_size' 123,
'format.bloom_filter_fpp' 0.001,
'format.bloom_filter_ndv' 100
'format.bloom_filter_ndv' 100,
'format.metadata' 'foo:bar baz'
)
----
2

# valid vs invalid metadata

statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata' ''
)

statement error
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata' 'foo:bar:extra'
)

statement error
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.wrong-metadata-key' 'foo:bar baz'
)

# validate multiple parquet file output with all options set
statement ok
CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/';
Expand Down
Loading