Skip to content

Commit

Permalink
Add configurable normalization for configuration options and preserve…
Browse files Browse the repository at this point in the history
… case for S3 paths (#13576)

* Do not normalize values

* Fix tests & update docs

* Prettier

* Lowercase config params

* Unify transform and parse

* Fix tests

* Rename `default_transform` and relax boundaries

* Make `compression` case-insensitive

* Comment to new line

* Deprecate and ignore `enable_options_value_normalization`

* Update datafusion/common/src/config.rs

* fix typo

---------

Co-authored-by: Oleks V <[email protected]>
  • Loading branch information
blaginin and comphead authored Dec 20, 2024
1 parent d7aeb1a commit 31acf45
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 141 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,13 @@ mod tests {

#[tokio::test]
async fn s3_object_store_builder() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
// "fake" is uppercase to ensure the values are not lowercased when parsed
let access_key_id = "FAKE_access_key_id";
let secret_access_key = "FAKE_secret_access_key";
let region = "fake_us-east-2";
let endpoint = "endpoint33";
let session_token = "fake_session_token";
let location = "s3://bucket/path/file.parquet";
let session_token = "FAKE_session_token";
let location = "s3://bucket/path/FAKE/file.parquet";

let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
libc = "0.2.140"
log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
Expand Down
80 changes: 56 additions & 24 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::fmt::{self, Display};
use std::str::FromStr;

Expand All @@ -29,7 +30,9 @@ use crate::{DataFusionError, Result};

/// A macro that wraps a configuration struct and automatically derives
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
/// in the [`ConfigOptions`] configuration tree
/// in the [`ConfigOptions`] configuration tree.
///
/// `transform` is used to normalize values before parsing.
///
/// For example,
///
Expand All @@ -38,7 +41,7 @@ use crate::{DataFusionError, Result};
/// /// Amazing config
/// pub struct MyConfig {
/// /// Field 1 doc
/// field1: String, default = "".to_string()
/// field1: String, transform = str::to_lowercase, default = "".to_string()
///
/// /// Field 2 doc
/// field2: usize, default = 232
Expand Down Expand Up @@ -67,9 +70,12 @@ use crate::{DataFusionError, Result};
/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
/// match key {
/// "field1" => self.field1.set(rem, value),
/// "field2" => self.field2.set(rem, value),
/// "field3" => self.field3.set(rem, value),
/// "field1" => {
/// let value = str::to_lowercase(value);
/// self.field1.set(rem, value.as_ref())
/// },
/// "field2" => self.field2.set(rem, value.as_ref()),
/// "field3" => self.field3.set(rem, value.as_ref()),
/// _ => _internal_err!(
/// "Config value \"{}\" not found on MyConfig",
/// key
Expand Down Expand Up @@ -102,15 +108,14 @@ use crate::{DataFusionError, Result};
/// ```
///
/// NB: Misplaced commas may result in nonsensical errors
///
#[macro_export]
macro_rules! config_namespace {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
$field_vis:vis $field_name:ident : $field_type:ty, $(warn = $warn: expr,)? $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
Expand All @@ -127,9 +132,14 @@ macro_rules! config_namespace {
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));

match key {
$(
stringify!($field_name) => self.$field_name.set(rem, value),
stringify!($field_name) => {
$(let value = $transform(value);)?
$(log::warn!($warn);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => return _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
Expand Down Expand Up @@ -211,12 +221,15 @@ config_namespace! {
/// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
pub enable_ident_normalization: bool, default = true

/// When set to true, SQL parser will normalize options value (convert value to lowercase)
pub enable_options_value_normalization: bool, default = true
/// When set to true, SQL parser will normalize options value (convert value to lowercase).
/// Note that this option is ignored and will be removed in the future. All case-insensitive values
/// are normalized automatically.
pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false

/// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
/// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi.
pub dialect: String, default = "generic".to_string()
// no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive

/// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
/// ignore the length. If false, error if a `VARCHAR` with a length is
Expand Down Expand Up @@ -431,7 +444,7 @@ config_namespace! {
///
/// Note that this default setting is not the same as
/// the default parquet writer setting.
pub compression: Option<String>, default = Some("zstd(3)".into())
pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())

/// (writing) Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
Expand All @@ -444,7 +457,7 @@ config_namespace! {
/// Valid values are: "none", "chunk", and "page"
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub statistics_enabled: Option<String>, default = Some("page".into())
pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())

/// (writing) Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
Expand All @@ -470,7 +483,7 @@ config_namespace! {
/// delta_byte_array, rle_dictionary, and byte_stream_split.
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub encoding: Option<String>, default = None
pub encoding: Option<String>, transform = str::to_lowercase, default = None

/// (writing) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true
Expand Down Expand Up @@ -971,29 +984,45 @@ impl<F: ConfigField + Default> ConfigField for Option<F> {
}
}

fn default_transform<T>(input: &str) -> Result<T>
where
T: FromStr,
<T as FromStr>::Err: Sync + Send + Error + 'static,
{
input.parse().map_err(|e| {
DataFusionError::Context(
format!(
"Error parsing '{}' as {}",
input,
std::any::type_name::<T>()
),
Box::new(DataFusionError::External(Box::new(e))),
)
})
}

#[macro_export]
macro_rules! config_field {
($t:ty) => {
config_field!($t, value => default_transform(value)?);
};

($t:ty, $arg:ident => $transform:expr) => {
impl ConfigField for $t {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = value.parse().map_err(|e| {
DataFusionError::Context(
format!(concat!("Error parsing {} as ", stringify!($t),), value),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
*self = $transform;
Ok(())
}
}
};
}

config_field!(String);
config_field!(bool);
config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
config_field!(usize);
config_field!(f64);
config_field!(u64);
Expand Down Expand Up @@ -1508,7 +1537,7 @@ macro_rules! config_namespace_with_hashmap {
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
$field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
Expand All @@ -1527,7 +1556,10 @@ macro_rules! config_namespace_with_hashmap {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => self.$field_name.set(rem, value),
stringify!($field_name) => {
$(let value = $transform(value);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
Expand Down Expand Up @@ -1606,7 +1638,7 @@ config_namespace_with_hashmap! {
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case-sensitive. If NULL, uses
/// default parquet options
pub compression: Option<String>, default = None
pub compression: Option<String>, transform = str::to_lowercase, default = None

/// Sets if statistics are enabled for the column
/// Valid values are: "none", "chunk", and "page"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl TableProviderFactory for StreamTableFactory {
let header = if let Ok(opt) = cmd
.options
.get("format.has_header")
.map(|has_header| bool::from_str(has_header))
.map(|has_header| bool::from_str(has_header.to_lowercase().as_str()))
.transpose()
{
opt.unwrap_or(false)
Expand Down
17 changes: 13 additions & 4 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ use std::env;
fn from_env() {
// Note: these must be a single test to avoid interference from concurrent execution
let env_key = "DATAFUSION_OPTIMIZER_FILTER_NULL_JOIN_KEYS";
env::set_var(env_key, "true");
let config = ConfigOptions::from_env().unwrap();
// valid testing in different cases
for bool_option in ["true", "TRUE", "True", "tRUe"] {
env::set_var(env_key, bool_option);
let config = ConfigOptions::from_env().unwrap();
env::remove_var(env_key);
assert!(config.optimizer.filter_null_join_keys);
}

// invalid testing
env::set_var(env_key, "ttruee");
let err = ConfigOptions::from_env().unwrap_err().strip_backtrace();
assert_eq!(err, "Error parsing 'ttruee' as bool\ncaused by\nExternal error: provided string was not `true` or `false`");
env::remove_var(env_key);
assert!(config.optimizer.filter_null_join_keys);

let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";

Expand All @@ -37,7 +46,7 @@ fn from_env() {
// for invalid testing
env::set_var(env_key, "abc");
let err = ConfigOptions::from_env().unwrap_err().strip_backtrace();
assert_eq!(err, "Error parsing abc as usize\ncaused by\nExternal error: invalid digit found in string");
assert_eq!(err, "Error parsing 'abc' as usize\ncaused by\nExternal error: invalid digit found in string");

env::remove_var(env_key);
let config = ConfigOptions::from_env().unwrap();
Expand Down
35 changes: 3 additions & 32 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use arrow_schema::*;
use datafusion_common::{
field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError,
};
use sqlparser::ast::TimezoneInfo;
use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
use sqlparser::ast::{TimezoneInfo, Value};

use datafusion_common::TableReference;
use datafusion_common::{
Expand All @@ -38,7 +38,7 @@ use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use datafusion_expr::utils::find_column_exprs;
use datafusion_expr::{col, Expr};

use crate::utils::{make_decimal_type, value_to_string};
use crate::utils::make_decimal_type;
pub use datafusion_expr::planner::ContextProvider;

/// SQL parser options
Expand All @@ -56,7 +56,7 @@ impl Default for ParserOptions {
parse_float_as_decimal: false,
enable_ident_normalization: true,
support_varchar_with_length: true,
enable_options_value_normalization: true,
enable_options_value_normalization: false,
}
}
}
Expand Down Expand Up @@ -87,32 +87,6 @@ impl IdentNormalizer {
}
}

/// Value Normalizer
#[derive(Debug)]
pub struct ValueNormalizer {
normalize: bool,
}

impl Default for ValueNormalizer {
fn default() -> Self {
Self { normalize: true }
}
}

impl ValueNormalizer {
pub fn new(normalize: bool) -> Self {
Self { normalize }
}

pub fn normalize(&self, value: Value) -> Option<String> {
match (value_to_string(&value), self.normalize) {
(Some(s), true) => Some(s.to_ascii_lowercase()),
(Some(s), false) => Some(s),
(None, _) => None,
}
}
}

/// Struct to store the states used by the Planner. The Planner will leverage the states to resolve
/// CTEs, Views, subqueries and PREPARE statements. The states include
/// Common Table Expression (CTE) provided with WITH clause and
Expand Down Expand Up @@ -254,7 +228,6 @@ pub struct SqlToRel<'a, S: ContextProvider> {
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) ident_normalizer: IdentNormalizer,
pub(crate) value_normalizer: ValueNormalizer,
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand All @@ -266,13 +239,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// Create a new query planner
pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
let ident_normalize = options.enable_ident_normalization;
let options_value_normalize = options.enable_options_value_normalization;

SqlToRel {
context_provider,
options,
ident_normalizer: IdentNormalizer::new(ident_normalize),
value_normalizer: ValueNormalizer::new(options_value_normalize),
}
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1386,8 +1386,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
return plan_err!("Option {key} is specified multiple times");
}

let Some(value_string) = self.value_normalizer.normalize(value.clone())
else {
let Some(value_string) = crate::utils::value_to_string(&value) else {
return plan_err!("Unsupported Value {}", value);
};

Expand Down
Loading

0 comments on commit 31acf45

Please sign in to comment.