Skip to content

Commit

Permalink
Make it possible to compile datafusion-common without default features (
Browse files Browse the repository at this point in the history
#7625)

* Fix parquet optional in datafusion-common

* disable default datafusion-common features for dependent crates that don't need parquet

* fmt

* run tomlformat

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
jonmmease and alamb authored Sep 22, 2023
1 parent 1c847b4 commit dab68a3
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 182 deletions.
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub enum FileType {
/// Apache Avro file
AVRO,
/// Apache Parquet file
#[cfg(feature = "parquet")]
PARQUET,
/// CSV file
CSV,
Expand All @@ -60,6 +61,7 @@ impl GetExt for FileType {
match self {
FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(),
FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(),
#[cfg(feature = "parquet")]
FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
Expand All @@ -72,6 +74,7 @@ impl Display for FileType {
let out = match self {
FileType::CSV => "csv",
FileType::JSON => "json",
#[cfg(feature = "parquet")]
FileType::PARQUET => "parquet",
FileType::AVRO => "avro",
FileType::ARROW => "arrow",
Expand All @@ -88,6 +91,7 @@ impl FromStr for FileType {
match s.as_str() {
"ARROW" => Ok(FileType::ARROW),
"AVRO" => Ok(FileType::AVRO),
#[cfg(feature = "parquet")]
"PARQUET" => Ok(FileType::PARQUET),
"CSV" => Ok(FileType::CSV),
"JSON" | "NDJSON" => Ok(FileType::JSON),
Expand Down
10 changes: 9 additions & 1 deletion datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod avro_writer;
pub mod csv_writer;
pub mod file_type;
pub mod json_writer;
#[cfg(feature = "parquet")]
pub mod parquet_writer;
pub(crate) mod parse_utils;

Expand All @@ -37,10 +38,12 @@ use crate::{
DataFusionError, FileType, Result,
};

#[cfg(feature = "parquet")]
use self::parquet_writer::ParquetWriterOptions;

use self::{
arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions,
csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions,
parquet_writer::ParquetWriterOptions,
};

/// Represents a single arbitrary setting in a
Expand Down Expand Up @@ -145,6 +148,7 @@ impl StatementOptions {
/// plus any DataFusion specific writing options (e.g. CSV compression)
#[derive(Clone, Debug)]
pub enum FileTypeWriterOptions {
#[cfg(feature = "parquet")]
Parquet(ParquetWriterOptions),
CSV(CsvWriterOptions),
JSON(JsonWriterOptions),
Expand All @@ -164,6 +168,7 @@ impl FileTypeWriterOptions {
let options = (config_defaults, statement_options);

let file_type_write_options = match file_type {
#[cfg(feature = "parquet")]
FileType::PARQUET => {
FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?)
}
Expand Down Expand Up @@ -193,6 +198,7 @@ impl FileTypeWriterOptions {
let options = (config_defaults, &empty_statement);

let file_type_write_options = match file_type {
#[cfg(feature = "parquet")]
FileType::PARQUET => {
FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?)
}
Expand All @@ -215,6 +221,7 @@ impl FileTypeWriterOptions {

/// Tries to extract ParquetWriterOptions from this FileTypeWriterOptions enum.
/// Returns an error if a different type from parquet is set.
#[cfg(feature = "parquet")]
pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> {
match self {
FileTypeWriterOptions::Parquet(opt) => Ok(opt),
Expand Down Expand Up @@ -281,6 +288,7 @@ impl Display for FileTypeWriterOptions {
FileTypeWriterOptions::Avro(_) => "AvroWriterOptions",
FileTypeWriterOptions::CSV(_) => "CsvWriterOptions",
FileTypeWriterOptions::JSON(_) => "JsonWriterOptions",
#[cfg(feature = "parquet")]
FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions",
};
write!(f, "{}", name)
Expand Down
175 changes: 166 additions & 9 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};

use crate::{
config::ConfigOptions,
file_options::parse_utils::{
parse_compression_string, parse_encoding_string, parse_statistics_string,
parse_version_string,
},
DataFusionError, Result,
};
use crate::{config::ConfigOptions, DataFusionError, Result};

use super::StatementOptions;

use super::{parse_utils::split_option_and_column_path, StatementOptions};
use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
schema::types::ColumnPath,
};

/// Options for writing parquet files
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -214,3 +213,161 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
})
}
}

/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding
pub(crate) fn parse_encoding_string(
str_setting: &str,
) -> Result<parquet::basic::Encoding> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"plain" => Ok(parquet::basic::Encoding::PLAIN),
"plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY),
"rle" => Ok(parquet::basic::Encoding::RLE),
"bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED),
"delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED),
"delta_length_byte_array" => {
Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY)
}
"delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY),
"rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY),
"byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet encoding: \
{str_setting}. Valid values are: plain, plain_dictionary, rle, \
bit_packed, delta_binary_packed, delta_length_byte_array, \
delta_byte_array, rle_dictionary, and byte_stream_split."
))),
}
}

/// Splits compression string into compression codec and optional compression_level
/// I.e. gzip(2) -> gzip, 2
fn split_compression_string(str_setting: &str) -> Result<(String, Option<u32>)> {
// ignore string literal chars passed from sqlparser i.e. remove single quotes
let str_setting = str_setting.replace('\'', "");
let split_setting = str_setting.split_once('(');

match split_setting {
Some((codec, rh)) => {
let level = &rh[..rh.len() - 1].parse::<u32>().map_err(|_| {
DataFusionError::Configuration(format!(
"Could not parse compression string. \
Got codec: {} and unknown level from {}",
codec, str_setting
))
})?;
Ok((codec.to_owned(), Some(*level)))
}
None => Ok((str_setting.to_owned(), None)),
}
}

/// Helper to ensure compression codecs which don't support levels
/// don't have one set. E.g. snappy(2) is invalid.
fn check_level_is_none(codec: &str, level: &Option<u32>) -> Result<()> {
if level.is_some() {
return Err(DataFusionError::Configuration(format!(
"Compression {codec} does not support specifying a level"
)));
}
Ok(())
}

/// Helper to ensure compression codecs which require a level
/// do have one set. E.g. zstd is invalid, zstd(3) is valid
fn require_level(codec: &str, level: Option<u32>) -> Result<u32> {
level.ok_or(DataFusionError::Configuration(format!(
"{codec} compression requires specifying a level such as {codec}(4)"
)))
}

/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression
pub(crate) fn parse_compression_string(
str_setting: &str,
) -> Result<parquet::basic::Compression> {
let str_setting_lower: &str = &str_setting.to_lowercase();
let (codec, level) = split_compression_string(str_setting_lower)?;
let codec = codec.as_str();
match codec {
"uncompressed" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::UNCOMPRESSED)
}
"snappy" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::SNAPPY)
}
"gzip" => {
let level = require_level(codec, level)?;
Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new(
level,
)?))
}
"lzo" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::LZO)
}
"brotli" => {
let level = require_level(codec, level)?;
Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
level,
)?))
}
"lz4" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::LZ4)
}
"zstd" => {
let level = require_level(codec, level)?;
Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(
level as i32,
)?))
}
"lz4_raw" => {
check_level_is_none(codec, &level)?;
Ok(parquet::basic::Compression::LZ4_RAW)
}
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet compression: \
{str_setting}. Valid values are: uncompressed, snappy, gzip(level), \
lzo, brotli(level), lz4, zstd(level), and lz4_raw."
))),
}
}

pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"1.0" => Ok(WriterVersion::PARQUET_1_0),
"2.0" => Ok(WriterVersion::PARQUET_2_0),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet writer version {str_setting} \
valid options are '1.0' and '2.0'"
))),
}
}

pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"none" => Ok(EnabledStatistics::None),
"chunk" => Ok(EnabledStatistics::Chunk),
"page" => Ok(EnabledStatistics::Page),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet statistics setting {str_setting} \
valid options are 'none', 'page', and 'chunk'"
))),
}
}

pub(crate) fn split_option_and_column_path(
str_setting: &str,
) -> (String, Option<ColumnPath>) {
match str_setting.replace('\'', "").split_once("::") {
Some((s1, s2)) => {
let col_path = ColumnPath::new(s2.split('.').map(|s| s.to_owned()).collect());
(s1.to_owned(), Some(col_path))
}
None => (str_setting.to_owned(), None),
}
}
Loading

0 comments on commit dab68a3

Please sign in to comment.