Skip to content

Commit

Permalink
Implement user defined planner for position
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlifoobar committed Jul 3, 2024
1 parent 3421b52 commit bac49cf
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 23 deletions.
11 changes: 8 additions & 3 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,14 +967,19 @@ impl SessionState {
let field_access_planner =
Arc::new(functions_array::planner::FieldAccessPlanner) as _;

query
query = query
.with_user_defined_planner(array_planner)
.with_user_defined_planner(field_access_planner)
}
#[cfg(not(feature = "array_expressions"))]
#[cfg(feature = "unicode_expressions")]
{
query
let position_planner =
Arc::new(functions::unicode::planner::PositionPlanner::default()) as _;

query = query.with_user_defined_planner(position_planner);
}

query
}
}

Expand Down
9 changes: 9 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(exprs))
}

// Plan the POSITION expression, e.g., POSITION(<expr> in <expr>)
// returns origin expression arguments if not possible
fn plan_position(
&self,
args: Vec<Expr>
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
}

/// An operator with two arguments to plan
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions/src/unicode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod strpos;
pub mod substr;
pub mod substrindex;
pub mod translate;
pub mod planner;

// create UDFs
make_udf_function!(
Expand Down
36 changes: 36 additions & 0 deletions datafusion/functions/src/unicode/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! SQL planning extensions like [`PositionPlanner`]
use datafusion_common::Result;
use datafusion_expr::{
expr::ScalarFunction,
planner::{PlannerResult, UserDefinedSQLPlanner},
Expr,
};

#[derive(Default)]
pub struct PositionPlanner {}

impl UserDefinedSQLPlanner for PositionPlanner {
fn plan_position(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::unicode::strpos(), args),
)))
}
}
35 changes: 15 additions & 20 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.parse_struct(values, fields, schema, planner_context)
}
SQLExpr::Position { expr, r#in } => {
self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
let substr =
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
let fullstr = self.sql_expr_to_logical_expr(*r#in, schema, planner_context)?;
let mut extract_args = vec![fullstr, substr];

for planner in self.planners.iter() {
match planner.plan_position(extract_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
extract_args = args;
}
}
}

not_impl_err!("Position not supported by UserDefinedExtensionPlanners: {extract_args:?}")
}
SQLExpr::AtTimeZone {
timestamp,
Expand Down Expand Up @@ -889,25 +903,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
};
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
}
fn sql_position_to_expr(
&self,
substr_expr: SQLExpr,
str_expr: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let fun = self
.context_provider
.get_function_meta("strpos")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'strpos' function")
})?;
let substr =
self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
let args = vec![fullstr, substr];
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
}
}

#[cfg(test)]
Expand Down

0 comments on commit bac49cf

Please sign in to comment.