diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs index c83da387c25d..a07f2e0cb847 100644 --- a/datafusion/common/src/file_options/file_type.rs +++ b/datafusion/common/src/file_options/file_type.rs @@ -48,6 +48,7 @@ pub enum FileType { /// Apache Avro file AVRO, /// Apache Parquet file + #[cfg(feature = "parquet")] PARQUET, /// CSV file CSV, @@ -60,6 +61,7 @@ impl GetExt for FileType { match self { FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(), FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(), + #[cfg(feature = "parquet")] FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), @@ -72,6 +74,7 @@ impl Display for FileType { let out = match self { FileType::CSV => "csv", FileType::JSON => "json", + #[cfg(feature = "parquet")] FileType::PARQUET => "parquet", FileType::AVRO => "avro", FileType::ARROW => "arrow", @@ -88,6 +91,7 @@ impl FromStr for FileType { match s.as_str() { "ARROW" => Ok(FileType::ARROW), "AVRO" => Ok(FileType::AVRO), + #[cfg(feature = "parquet")] "PARQUET" => Ok(FileType::PARQUET), "CSV" => Ok(FileType::CSV), "JSON" | "NDJSON" => Ok(FileType::JSON), diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 61c5affd12a2..45b105dfadae 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -22,6 +22,7 @@ pub mod avro_writer; pub mod csv_writer; pub mod file_type; pub mod json_writer; +#[cfg(feature = "parquet")] pub mod parquet_writer; pub(crate) mod parse_utils; @@ -37,10 +38,12 @@ use crate::{ DataFusionError, FileType, Result, }; +#[cfg(feature = "parquet")] +use self::parquet_writer::ParquetWriterOptions; + use self::{ arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions, csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions, - parquet_writer::ParquetWriterOptions, }; /// Represents a single arbitrary setting in a @@ -145,6 +148,7 @@ impl StatementOptions { /// plus any DataFusion specific writing options (e.g. CSV compression) #[derive(Clone, Debug)] pub enum FileTypeWriterOptions { + #[cfg(feature = "parquet")] Parquet(ParquetWriterOptions), CSV(CsvWriterOptions), JSON(JsonWriterOptions), @@ -164,6 +168,7 @@ impl FileTypeWriterOptions { let options = (config_defaults, statement_options); let file_type_write_options = match file_type { + #[cfg(feature = "parquet")] FileType::PARQUET => { FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) } @@ -193,6 +198,7 @@ impl FileTypeWriterOptions { let options = (config_defaults, &empty_statement); let file_type_write_options = match file_type { + #[cfg(feature = "parquet")] FileType::PARQUET => { FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) } @@ -215,6 +221,7 @@ impl FileTypeWriterOptions { /// Tries to extract ParquetWriterOptions from this FileTypeWriterOptions enum. /// Returns an error if a different type from parquet is set. + #[cfg(feature = "parquet")] pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> { match self { FileTypeWriterOptions::Parquet(opt) => Ok(opt), @@ -281,6 +288,7 @@ impl Display for FileTypeWriterOptions { FileTypeWriterOptions::Avro(_) => "AvroWriterOptions", FileTypeWriterOptions::CSV(_) => "CsvWriterOptions", FileTypeWriterOptions::JSON(_) => "JsonWriterOptions", + #[cfg(feature = "parquet")] FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions", }; write!(f, "{}", name) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 5cbbdf365e39..93611114cf8e 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -19,16 +19,15 @@ use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; -use crate::{ - config::ConfigOptions, - file_options::parse_utils::{ - parse_compression_string, parse_encoding_string, parse_statistics_string, - parse_version_string, - }, - DataFusionError, Result, -}; +use crate::{config::ConfigOptions, DataFusionError, Result}; + +use super::StatementOptions; -use super::{parse_utils::split_option_and_column_path, StatementOptions}; +use parquet::{ + basic::{BrotliLevel, GzipLevel, ZstdLevel}, + file::properties::{EnabledStatistics, WriterVersion}, + schema::types::ColumnPath, +}; /// Options for writing parquet files #[derive(Clone, Debug)] @@ -214,3 +213,161 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions { }) } } + +/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding +pub(crate) fn parse_encoding_string( + str_setting: &str, +) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "plain" => Ok(parquet::basic::Encoding::PLAIN), + "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), + "rle" => Ok(parquet::basic::Encoding::RLE), + "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), + "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), + "delta_length_byte_array" => { + Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY) + } + "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), + "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), + "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet encoding: \ + {str_setting}. Valid values are: plain, plain_dictionary, rle, \ + bit_packed, delta_binary_packed, delta_length_byte_array, \ + delta_byte_array, rle_dictionary, and byte_stream_split." + ))), + } +} + +/// Splits compression string into compression codec and optional compression_level +/// I.e. gzip(2) -> gzip, 2 +fn split_compression_string(str_setting: &str) -> Result<(String, Option)> { + // ignore string literal chars passed from sqlparser i.e. remove single quotes + let str_setting = str_setting.replace('\'', ""); + let split_setting = str_setting.split_once('('); + + match split_setting { + Some((codec, rh)) => { + let level = &rh[..rh.len() - 1].parse::().map_err(|_| { + DataFusionError::Configuration(format!( + "Could not parse compression string. \ + Got codec: {} and unknown level from {}", + codec, str_setting + )) + })?; + Ok((codec.to_owned(), Some(*level))) + } + None => Ok((str_setting.to_owned(), None)), + } +} + +/// Helper to ensure compression codecs which don't support levels +/// don't have one set. E.g. snappy(2) is invalid. +fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { + if level.is_some() { + return Err(DataFusionError::Configuration(format!( + "Compression {codec} does not support specifying a level" + ))); + } + Ok(()) +} + +/// Helper to ensure compression codecs which require a level +/// do have one set. E.g. zstd is invalid, zstd(3) is valid +fn require_level(codec: &str, level: Option) -> Result { + level.ok_or(DataFusionError::Configuration(format!( + "{codec} compression requires specifying a level such as {codec}(4)" + ))) +} + +/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression +pub(crate) fn parse_compression_string( + str_setting: &str, +) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + let (codec, level) = split_compression_string(str_setting_lower)?; + let codec = codec.as_str(); + match codec { + "uncompressed" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::UNCOMPRESSED) + } + "snappy" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::SNAPPY) + } + "gzip" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new( + level, + )?)) + } + "lzo" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZO) + } + "brotli" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( + level, + )?)) + } + "lz4" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4) + } + "zstd" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + level as i32, + )?)) + } + "lz4_raw" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4_RAW) + } + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet compression: \ + {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ + lzo, brotli(level), lz4, zstd(level), and lz4_raw." + ))), + } +} + +pub(crate) fn parse_version_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "1.0" => Ok(WriterVersion::PARQUET_1_0), + "2.0" => Ok(WriterVersion::PARQUET_2_0), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet writer version {str_setting} \ + valid options are '1.0' and '2.0'" + ))), + } +} + +pub(crate) fn parse_statistics_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "none" => Ok(EnabledStatistics::None), + "chunk" => Ok(EnabledStatistics::Chunk), + "page" => Ok(EnabledStatistics::Page), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet statistics setting {str_setting} \ + valid options are 'none', 'page', and 'chunk'" + ))), + } +} + +pub(crate) fn split_option_and_column_path( + str_setting: &str, +) -> (String, Option) { + match str_setting.replace('\'', "").split_once("::") { + Some((s1, s2)) => { + let col_path = ColumnPath::new(s2.split('.').map(|s| s.to_owned()).collect()); + (s1.to_owned(), Some(col_path)) + } + None => (str_setting.to_owned(), None), + } +} diff --git a/datafusion/common/src/file_options/parse_utils.rs b/datafusion/common/src/file_options/parse_utils.rs index a58cc8e13cf4..38cf5eb489f7 100644 --- a/datafusion/common/src/file_options/parse_utils.rs +++ b/datafusion/common/src/file_options/parse_utils.rs @@ -16,13 +16,6 @@ // under the License. //! Functions for parsing arbitrary passed strings to valid file_option settings - -use parquet::{ - basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::properties::{EnabledStatistics, WriterVersion}, - schema::types::ColumnPath, -}; - use crate::{DataFusionError, Result}; /// Converts a String option to a bool, or returns an error if not a valid bool string. @@ -36,161 +29,3 @@ pub(crate) fn parse_boolean_string(option: &str, value: String) -> Result ))), } } - -/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding -pub(crate) fn parse_encoding_string( - str_setting: &str, -) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "plain" => Ok(parquet::basic::Encoding::PLAIN), - "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), - "rle" => Ok(parquet::basic::Encoding::RLE), - "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), - "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), - "delta_length_byte_array" => { - Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY) - } - "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), - "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), - "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet encoding: \ - {str_setting}. Valid values are: plain, plain_dictionary, rle, \ - bit_packed, delta_binary_packed, delta_length_byte_array, \ - delta_byte_array, rle_dictionary, and byte_stream_split." - ))), - } -} - -/// Splits compression string into compression codec and optional compression_level -/// I.e. gzip(2) -> gzip, 2 -fn split_compression_string(str_setting: &str) -> Result<(String, Option)> { - // ignore string literal chars passed from sqlparser i.e. remove single quotes - let str_setting = str_setting.replace('\'', ""); - let split_setting = str_setting.split_once('('); - - match split_setting { - Some((codec, rh)) => { - let level = &rh[..rh.len() - 1].parse::().map_err(|_| { - DataFusionError::Configuration(format!( - "Could not parse compression string. \ - Got codec: {} and unknown level from {}", - codec, str_setting - )) - })?; - Ok((codec.to_owned(), Some(*level))) - } - None => Ok((str_setting.to_owned(), None)), - } -} - -/// Helper to ensure compression codecs which don't support levels -/// don't have one set. E.g. snappy(2) is invalid. -fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { - if level.is_some() { - return Err(DataFusionError::Configuration(format!( - "Compression {codec} does not support specifying a level" - ))); - } - Ok(()) -} - -/// Helper to ensure compression codecs which require a level -/// do have one set. E.g. zstd is invalid, zstd(3) is valid -fn require_level(codec: &str, level: Option) -> Result { - level.ok_or(DataFusionError::Configuration(format!( - "{codec} compression requires specifying a level such as {codec}(4)" - ))) -} - -/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression -pub(crate) fn parse_compression_string( - str_setting: &str, -) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - let (codec, level) = split_compression_string(str_setting_lower)?; - let codec = codec.as_str(); - match codec { - "uncompressed" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::UNCOMPRESSED) - } - "snappy" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::SNAPPY) - } - "gzip" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new( - level, - )?)) - } - "lzo" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZO) - } - "brotli" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( - level, - )?)) - } - "lz4" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZ4) - } - "zstd" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( - level as i32, - )?)) - } - "lz4_raw" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZ4_RAW) - } - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet compression: \ - {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ - lzo, brotli(level), lz4, zstd(level), and lz4_raw." - ))), - } -} - -pub(crate) fn parse_version_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "1.0" => Ok(WriterVersion::PARQUET_1_0), - "2.0" => Ok(WriterVersion::PARQUET_2_0), - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet writer version {str_setting} \ - valid options are '1.0' and '2.0'" - ))), - } -} - -pub(crate) fn parse_statistics_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "none" => Ok(EnabledStatistics::None), - "chunk" => Ok(EnabledStatistics::Chunk), - "page" => Ok(EnabledStatistics::Page), - _ => Err(DataFusionError::Configuration(format!( - "Unknown or unsupported parquet statistics setting {str_setting} \ - valid options are 'none', 'page', and 'chunk'" - ))), - } -} - -pub(crate) fn split_option_and_column_path( - str_setting: &str, -) -> (String, Option) { - match str_setting.replace('\'', "").split_once("::") { - Some((s1, s2)) => { - let col_path = ColumnPath::new(s2.split('.').map(|s| s.to_owned()).collect()); - (s1.to_owned(), Some(col_path)) - } - None => (str_setting.to_owned(), None), - } -} diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index edff3f05a153..f6d8e15fdd9d 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -37,7 +37,7 @@ path = "src/lib.rs" [dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } arrow = { workspace = true } -datafusion-common = { path = "../common", version = "31.0.0" } +datafusion-common = { path = "../common", version = "31.0.0", default-features = false } sqlparser = { workspace = true } strum = { version = "0.25.0", features = ["derive"] } strum_macros = "0.25.0" diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index f42a67c0d782..2fc3b500f105 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -43,7 +43,7 @@ unicode_expressions = ["datafusion-physical-expr/unicode_expressions"] arrow = { workspace = true } async-trait = "0.1.41" chrono = { workspace = true } -datafusion-common = { path = "../common", version = "31.0.0" } +datafusion-common = { path = "../common", version = "31.0.0", default-features = false } datafusion-expr = { path = "../expr", version = "31.0.0" } datafusion-physical-expr = { path = "../physical-expr", version = "31.0.0", default-features = false } hashbrown = { version = "0.14", features = ["raw"] } diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 777918bfac6a..cfb8b580ee4e 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -52,7 +52,7 @@ base64 = { version = "0.21", optional = true } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } chrono = { workspace = true } -datafusion-common = { path = "../common", version = "31.0.0" } +datafusion-common = { path = "../common", version = "31.0.0", default-features = false } datafusion-expr = { path = "../expr", version = "31.0.0" } half = { version = "2.1", default-features = false } hashbrown = { version = "0.14", features = ["raw"] } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 7226691e2c1c..ace6f5d95483 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -40,7 +40,7 @@ arrow-buffer = { workspace = true } arrow-schema = { workspace = true } async-trait = "0.1.41" chrono = { version = "0.4.23", default-features = false } -datafusion-common = { path = "../common", version = "31.0.0" } +datafusion-common = { path = "../common", version = "31.0.0", default-features = false } datafusion-execution = { path = "../execution", version = "31.0.0" } datafusion-expr = { path = "../expr", version = "31.0.0" } datafusion-physical-expr = { path = "../physical-expr", version = "31.0.0" } diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 9ba9d5484dec..02b609313a38 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -43,7 +43,7 @@ json = ["pbjson", "serde", "serde_json"] arrow = { workspace = true } chrono = { workspace = true } datafusion = { path = "../core", version = "31.0.0" } -datafusion-common = { path = "../common", version = "31.0.0" } +datafusion-common = { path = "../common", version = "31.0.0", default-features = false } datafusion-expr = { path = "../expr", version = "31.0.0" } object_store = { version = "0.7.0" } pbjson = { version = "0.5", optional = true } diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index d1ce681f0494..c2cdc4c52dbd 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -39,7 +39,7 @@ unicode_expressions = [] [dependencies] arrow = { workspace = true } arrow-schema = { workspace = true } -datafusion-common = { path = "../common", version = "31.0.0" } +datafusion-common = { path = "../common", version = "31.0.0", default-features = false } datafusion-expr = { path = "../expr", version = "31.0.0" } log = "^0.4" sqlparser = { workspace = true } diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 1887fb35f59b..2cd16927be2c 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -35,7 +35,7 @@ arrow = {workspace = true} async-trait = "0.1.41" bigdecimal = "0.4.1" datafusion = {path = "../core", version = "31.0.0"} -datafusion-common = {path = "../common", version = "31.0.0"} +datafusion-common = {path = "../common", version = "31.0.0", default-features = false} half = "2.2.1" itertools = "0.11" object_store = "0.7.0"