Skip to content

Commit

Permalink
draft option enable_options_value_normalization
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlifoobar committed Jul 8, 2024
1 parent 1e39a85 commit c97323e
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 88 deletions.
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ config_namespace! {
/// specified. The Arrow type system does not have a notion of maximum
/// string length and thus DataFusion can not enforce such limits.
pub support_varchar_with_length: bool, default = true

/// When set to true, SQL parser will normalize options value (convert value to lowercase)
pub enable_options_value_normalization: bool, default = true
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ impl SessionState {
ParserOptions {
parse_float_as_decimal: sql_parser_options.parse_float_as_decimal,
enable_ident_normalization: sql_parser_options.enable_ident_normalization,
enable_options_value_normalization: sql_parser_options.enable_options_value_normalization,
support_varchar_with_length: sql_parser_options.support_varchar_with_length,
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/cte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Process CTEs from top to bottom
for cte in with.cte_tables {
// A `WITH` block can't use the same name more than once
let cte_name = self.normalizer.normalize(cte.alias.name.clone());
let cte_name = self.ident_normalizer.normalize(cte.alias.name.clone());
if planner_context.contains_cte(&cte_name) {
return plan_err!(
"WITH query name {cte_name:?} specified more than once"
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// interpret names with '.' as if they were
// compound identifiers, but this is not a compound
// identifier. (e.g. it is "foo.bar" not foo.bar)
let normalize_ident = self.normalizer.normalize(id);
let normalize_ident = self.ident_normalizer.normalize(id);
match schema.field_with_unqualified_name(normalize_ident.as_str()) {
Ok(_) => {
// found a match without a qualified name, this is a inner table column
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if ids[0].value.starts_with('@') {
let var_names: Vec<_> = ids
.into_iter()
.map(|id| self.normalizer.normalize(id))
.map(|id| self.ident_normalizer.normalize(id))
.collect();
let ty = self
.context_provider
Expand All @@ -111,7 +111,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} else {
let ids = ids
.into_iter()
.map(|id| self.normalizer.normalize(id))
.map(|id| self.ident_normalizer.normalize(id))
.collect::<Vec<_>>();

// Currently not supporting more than one nested level
Expand Down
49 changes: 40 additions & 9 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion_common::{
field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError,
};
use datafusion_expr::planner::UserDefinedSQLPlanner;
use sqlparser::ast::TimezoneInfo;
use sqlparser::ast::{TimezoneInfo, Value};
use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
Expand All @@ -49,6 +49,7 @@ pub struct ParserOptions {
pub parse_float_as_decimal: bool,
pub enable_ident_normalization: bool,
pub support_varchar_with_length: bool,
pub enable_options_value_normalization: bool,
}

impl Default for ParserOptions {
Expand All @@ -57,6 +58,7 @@ impl Default for ParserOptions {
parse_float_as_decimal: false,
enable_ident_normalization: true,
support_varchar_with_length: true,
enable_options_value_normalization: true,
}
}
}
Expand Down Expand Up @@ -87,6 +89,32 @@ impl IdentNormalizer {
}
}

/// Value Normalizer
#[derive(Debug)]
pub struct ValueNormalizer {
normalize: bool,
}

impl Default for ValueNormalizer {
fn default() -> Self {
Self { normalize: true }
}
}

impl ValueNormalizer {
pub fn new(normalize: bool) -> Self {
Self { normalize }
}

pub fn normalize(&self, value: Value) -> Result<String> {
if self.normalize {
crate::utils::normalize_value(&value)
} else {
crate::utils::value_to_string(&value)
}
}
}

/// Struct to store the states used by the Planner. The Planner will leverage the states to resolve
/// CTEs, Views, subqueries and PREPARE statements. The states include
/// Common Table Expression (CTE) provided with WITH clause and
Expand Down Expand Up @@ -185,7 +213,8 @@ impl PlannerContext {
pub struct SqlToRel<'a, S: ContextProvider> {
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) normalizer: IdentNormalizer,
pub(crate) ident_normalizer: IdentNormalizer,
pub(crate) value_normalizer: ValueNormalizer,
/// user defined planner extensions
pub(crate) planners: Vec<Arc<dyn UserDefinedSQLPlanner>>,
}
Expand All @@ -207,12 +236,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

/// Create a new query planner
pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
let normalize = options.enable_ident_normalization;
let ident_normalize = options.enable_ident_normalization;
let options_value_normalize = options.enable_options_value_normalization;

SqlToRel {
context_provider,
options,
normalizer: IdentNormalizer::new(normalize),
ident_normalizer: IdentNormalizer::new(ident_normalize),
value_normalizer: ValueNormalizer::new(options_value_normalize),
planners: vec![],
}
}
Expand All @@ -227,7 +258,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.any(|x| x.option == ColumnOption::NotNull);
fields.push(Field::new(
self.normalizer.normalize(column.name),
self.ident_normalizer.normalize(column.name),
data_type,
!not_nullable,
));
Expand Down Expand Up @@ -266,7 +297,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
.map_err(error_desc)?;
column_defaults
.push((self.normalizer.normalize(column.name.clone()), default_expr));
.push((self.ident_normalizer.normalize(column.name.clone()), default_expr));
}
}
Ok(column_defaults)
Expand All @@ -281,7 +312,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.apply_expr_alias(plan, alias.columns)?;

LogicalPlanBuilder::from(plan)
.alias(TableReference::bare(self.normalizer.normalize(alias.name)))?
.alias(TableReference::bare(self.ident_normalizer.normalize(alias.name)))?
.build()
}

Expand All @@ -302,7 +333,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let fields = plan.schema().fields().clone();
LogicalPlanBuilder::from(plan)
.project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
col(field.name()).alias(self.normalizer.normalize(ident))
col(field.name()).alias(self.ident_normalizer.normalize(ident))
}))?
.build()
}
Expand Down Expand Up @@ -428,7 +459,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
None => Ident::new(format!("c{idx}"))
};
Ok(Arc::new(Field::new(
self.normalizer.normalize(field_name),
self.ident_normalizer.normalize(field_name),
data_type,
true,
)))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/relation/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
JoinConstraint::Using(idents) => {
let keys: Vec<Column> = idents
.into_iter()
.map(|x| Column::from_name(self.normalizer.normalize(x)))
.map(|x| Column::from_name(self.ident_normalizer.normalize(x)))
.collect();
LogicalPlanBuilder::from(left)
.join_using(right, join_type, keys)?
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&[&[plan.schema()]],
&plan.using_columns()?,
)?;
let name = self.normalizer.normalize(alias);
let name = self.ident_normalizer.normalize(alias);
// avoiding adding an alias if the column name is the same.
let expr = match &col {
Expr::Column(column) if column.name.eq(&name) => col,
Expand Down
105 changes: 33 additions & 72 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,6 @@ fn ident_to_string(ident: &Ident) -> String {
normalize_ident(ident.to_owned())
}

fn value_to_string(value: &Value) -> Option<String> {
match value {
Value::SingleQuotedString(s) => Some(s.to_string()),
Value::DollarQuotedString(s) => Some(s.to_string()),
Value::Number(_, _) | Value::Boolean(_) => Some(value.to_string()),
Value::DoubleQuotedString(_)
| Value::EscapedStringLiteral(_)
| Value::NationalStringLiteral(_)
| Value::SingleQuotedByteStringLiteral(_)
| Value::DoubleQuotedByteStringLiteral(_)
| Value::TripleSingleQuotedString(_)
| Value::TripleDoubleQuotedString(_)
| Value::TripleSingleQuotedByteStringLiteral(_)
| Value::TripleDoubleQuotedByteStringLiteral(_)
| Value::SingleQuotedRawStringLiteral(_)
| Value::DoubleQuotedRawStringLiteral(_)
| Value::TripleSingleQuotedRawStringLiteral(_)
| Value::TripleDoubleQuotedRawStringLiteral(_)
| Value::HexStringLiteral(_)
| Value::Null
| Value::Placeholder(_) => None,
}
}

fn object_name_to_string(object_name: &ObjectName) -> String {
object_name
.0
Expand Down Expand Up @@ -880,25 +856,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
};

let mut options = HashMap::new();
for (key, value) in statement.options {
let value_string = match value_to_string(&value) {
None => {
return plan_err!("Unsupported Value in COPY statement {}", value);
}
Some(v) => v,
};

if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key);
options.insert(renamed_key.to_lowercase(), value_string.to_lowercase());
} else {
options.insert(key.to_lowercase(), value_string.to_lowercase());
}
}
let options_map: HashMap<String, String> = self.parse_options_map(statement.options)?;

let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
if let Ok(ext_file_type) = self.context_provider.get_file_type(stored_as) {
Expand Down Expand Up @@ -945,7 +903,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
output_url: statement.target,
file_type,
partition_by,
options,
options: options_map,
}))
}

Expand Down Expand Up @@ -1006,29 +964,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let inline_constraints = calc_inline_constraints_from_columns(&columns);
all_constraints.extend(inline_constraints);

let mut options_map = HashMap::<String, String>::new();
for (key, value) in options {
if options_map.contains_key(&key) {
return plan_err!("Option {key} is specified multiple times");
}

let Some(value_string) = value_to_string(&value) else {
return plan_err!(
"Unsupported Value in CREATE EXTERNAL TABLE statement {}",
value
);
};

if !(&key.contains('.')) {
// If a config does not belong to any namespace, we assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key.to_lowercase());
options_map.insert(renamed_key, value_string.to_lowercase());
} else {
options_map.insert(key.to_lowercase(), value_string.to_lowercase());
}
}
let options_map = self.parse_options_map(options)?;

let compression = options_map
.get("format.compression")
Expand Down Expand Up @@ -1080,6 +1016,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)))
}

fn parse_options_map(
&self,
options: Vec<(String, Value)>) -> Result<HashMap<String, String>> {
let mut options_map = HashMap::new();
for (key, value) in options {
if options_map.contains_key(&key) {
return plan_err!("Option {key} is specified multiple times");
}

let value_string = self.value_normalizer.normalize(value)?;

if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key);
options_map.insert(renamed_key.to_lowercase(), value_string);
} else {
options_map.insert(key.to_lowercase(), value_string);
}
}

Ok(options_map)
}

/// Generate a plan for EXPLAIN ... that will print out a plan
///
/// Note this is the sqlparser explain statement, not the
Expand Down Expand Up @@ -1187,11 +1148,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// parse value string from Expr
let value_string = match &value[0] {
SQLExpr::Identifier(i) => ident_to_string(i),
SQLExpr::Value(v) => match value_to_string(v) {
None => {
SQLExpr::Value(v) => match crate::utils::value_to_string(v) {
Err(_) => {
return plan_err!("Unsupported Value {}", value[0]);
}
Some(v) => v,
Ok(v) => v,
},
// for capture signed number e.g. +8, -8
SQLExpr::UnaryOp { op, expr } => match op {
Expand Down Expand Up @@ -1346,7 +1307,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// If the target table has an alias, use it to qualify the column name
if let Some(alias) = &table_alias {
datafusion_expr::Expr::Column(Column::new(
Some(self.normalizer.normalize(alias.name.clone())),
Some(self.ident_normalizer.normalize(alias.name.clone())),
field.name(),
))
} else {
Expand Down Expand Up @@ -1403,7 +1364,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut value_indices = vec![None; table_schema.fields().len()];
let fields = columns
.into_iter()
.map(|c| self.normalizer.normalize(c))
.map(|c| self.ident_normalizer.normalize(c))
.enumerate()
.map(|(i, c)| {
let column_index = table_schema
Expand Down
Loading

0 comments on commit c97323e

Please sign in to comment.