Skip to content

Commit

Permalink
refactor: restore the ability to add kv metadata into the generated f…
Browse files Browse the repository at this point in the history
…ile sink
  • Loading branch information
wiedld committed Apr 24, 2024
1 parent 391e074 commit 60fbdac
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 4 deletions.
7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Display};
use std::str::FromStr;

#[cfg(feature = "parquet")]
use parquet::file::metadata::KeyValue;

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::{DataFusionError, FileType, Result};
Expand Down Expand Up @@ -1368,6 +1371,10 @@ pub struct TableParquetOptions {
pub global: ParquetOptions,
/// Column specific options. Default usage is parquet.XX::column.
pub column_specific_options: HashMap<String, ColumnOptions>,
/// Optional, additional metadata to be inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
#[cfg(feature = "parquet")]
pub key_value_metadata: Option<Vec<KeyValue>>,
}

impl ConfigField for TableParquetOptions {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_enabled);
.set_bloom_filter_enabled(*bloom_filter_enabled)
.set_key_value_metadata(parquet_options.key_value_metadata.clone());

if let Some(encoding) = &encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
Expand Down
22 changes: 19 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ mod tests {
};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -1865,7 +1865,13 @@ mod tests {
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
TableParquetOptions {
key_value_metadata: Some(vec![KeyValue {
key: "my-data".into(),
value: Some("stuff".to_string()),
}]),
..Default::default()
},
));

// create data
Expand Down Expand Up @@ -1899,7 +1905,10 @@ mod tests {
let (
path,
FileMetaData {
num_rows, schema, ..
num_rows,
schema,
key_value_metadata,
..
},
) = written.take(1).next().unwrap();
let path_parts = path.parts().collect::<Vec<_>>();
Expand All @@ -1915,6 +1924,13 @@ mod tests {
"output file metadata should contain col b"
);

let key_value_metadata = key_value_metadata.unwrap();
let my_metadata = key_value_metadata
.iter()
.filter(|kv| kv.key == "my-data")
.collect::<Vec<_>>();
assert_eq!(my_metadata.len(), 1);

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
.unwrap()
.unwrap(),
column_specific_options,
key_value_metadata: None,
})
}
}
Expand Down

0 comments on commit 60fbdac

Please sign in to comment.