Skip to content

Commit

Permalink
Implement user defined planner
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlifoobar committed Jul 5, 2024
1 parent e18f6f6 commit adaff12
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 55 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ impl SessionState {
Arc::new(functions_array::planner::ArrayFunctionPlanner),
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::FieldAccessPlanner),
#[cfg(feature = "datetime_expressions")]
Arc::new(functions::datetime::planner::ExtractPlanner),
#[cfg(any(feature = "datetime_expressions", feature ="unicode_expressions"))]
Arc::new(functions::planner::UserDefinedFunctionPlanner),
];

let mut new_self = SessionState {
Expand Down
7 changes: 0 additions & 7 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,6 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
) -> Result<PlannerResult<RawDictionaryExpr>> {
Ok(PlannerResult::Original(expr))
}

/// Plan an extract expression, e.g., `EXTRACT(month FROM foo)`
///
/// Returns origin expression arguments if not possible
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
}

/// An operator with two arguments to plan
Expand Down
1 change: 0 additions & 1 deletion datafusion/functions/src/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub mod date_trunc;
pub mod from_unixtime;
pub mod make_date;
pub mod now;
pub mod planner;
pub mod to_char;
pub mod to_date;
pub mod to_timestamp;
Expand Down
3 changes: 3 additions & 0 deletions datafusion/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ make_stub_package!(crypto, "crypto_expressions");
pub mod unicode;
make_stub_package!(unicode, "unicode_expressions");

#[cfg(any(feature = "datetime_expressions", feature = "unicode_expressions"))]
pub mod planner;

mod utils;

/// Fluent-style API for creating `Expr`s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,36 @@
// specific language governing permissions and limitations
// under the License.

//! SQL planning extensions like [`ExtractPlanner`]
//! SQL planning extensions like [`UserDefinedFunctionPlanner`]
use datafusion_common::Result;
use datafusion_expr::{
expr::ScalarFunction,
planner::{PlannerResult, UserDefinedSQLPlanner},
Expr,
sqlparser, Expr,
};
use sqlparser::ast::Expr as SQLExpr;

#[derive(Default)]
pub struct ExtractPlanner;
pub struct UserDefinedFunctionPlanner;

impl UserDefinedSQLPlanner for ExtractPlanner {
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::datetime::date_part(), args),
)))
impl UserDefinedSQLPlanner for UserDefinedFunctionPlanner {
// Plan the user defined function, returns origin expression arguments if not possible
fn plan_udf(
&self,
sql: &SQLExpr,
args: Vec<Expr>,
) -> Result<PlannerResult<Vec<Expr>>> {
match sql {
#[cfg(feature = "datetime_expressions")]
SQLExpr::Extract { .. } => Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::datetime::date_part(), args),
))),
#[cfg(feature = "unicode_expressions")]
SQLExpr::Position { .. } => Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::unicode::strpos(), args),
))),
_ => Ok(PlannerResult::Original(args)),
}
}
}
71 changes: 34 additions & 37 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,29 +173,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let sql_not_moved = sql.clone();
match sql {
SQLExpr::Value(value) => {
self.parse_value(value, planner_context.prepare_param_data_types())
}
SQLExpr::Extract { field, expr } => {
let mut extract_args = vec![
Expr::Literal(ScalarValue::from(format!("{field}"))),
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
];

for planner in self.planners.iter() {
match planner.plan_extract(extract_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
extract_args = args;
}
}
}

not_impl_err!("Extract not supported by UserDefinedExtensionPlanners: {extract_args:?}")
}

SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
SQLExpr::Interval(interval) => {
self.sql_interval_to_expr(false, interval, schema, planner_context)
Expand Down Expand Up @@ -600,24 +581,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLExpr::Struct { values, fields } => {
self.parse_struct(values, fields, schema, planner_context)
}
SQLExpr::Position { expr, r#in } => {
let substr =
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
let fullstr =
self.sql_expr_to_logical_expr(*r#in, schema, planner_context)?;
let mut extracted_args = vec![fullstr, substr];

for planner in self.planners.iter() {
match planner.plan_udf(&sql_not_moved, extracted_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
extracted_args = args;
}
}
}

not_impl_err!("Position not supported by UserDefinedExtensionPlanners: {extracted_args:?}")
}
SQLExpr::AtTimeZone {
timestamp,
time_zone,
Expand All @@ -641,6 +604,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLExpr::Dictionary(fields) => {
self.try_plan_dictionary_literal(fields, schema, planner_context)
}
SQLExpr::Extract { .. } | SQLExpr::Position { .. } => {
self.sql_udf_plan(sql, schema, planner_context)
}
_ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
}
}
Expand Down Expand Up @@ -829,6 +795,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)))
}

fn sql_udf_plan(
&self,
sql: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let sql_not_moved = sql.clone();
let mut extracted_args = match sql {
SQLExpr::Position { expr, r#in } => Ok(vec![
self.sql_expr_to_logical_expr(*r#in, schema, planner_context)?,
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
]),
SQLExpr::Extract { field, expr } => Ok(vec![
Expr::Literal(ScalarValue::from(format!("{field}"))),
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
]),
_ => not_impl_err!("sql_udf_plan not support sql expression: {sql:?}"),
}?;

for planner in self.planners.iter() {
match planner.plan_udf(&sql_not_moved, extracted_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
extracted_args = args;
}
}
}

not_impl_err!("sql_udf_plan not support sql expression: {sql_not_moved:?}")
}

fn sql_similarto_to_expr(
&self,
negated: bool,
Expand Down

0 comments on commit adaff12

Please sign in to comment.