Skip to content

Commit

Permalink
support unparsing the implicit lateral unnest plan
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Dec 18, 2024
1 parent bd2c975 commit 7c5e3e3
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 14 deletions.
66 changes: 54 additions & 12 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::{
Unparser,
};
use crate::unparser::ast::UnnestRelationBuilder;
use crate::unparser::utils::unproject_agg_exprs;
use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
use crate::utils::UNNEST_PLACEHOLDER;
use datafusion_common::{
internal_err, not_impl_err,
Expand Down Expand Up @@ -235,9 +235,10 @@ impl Unparser<'_> {
plan: &LogicalPlan,
relation: &mut RelationBuilder,
alias: Option<ast::TableAlias>,
lateral: bool,
) -> Result<()> {
let mut derived_builder = DerivedRelationBuilder::default();
derived_builder.lateral(false).alias(alias).subquery({
derived_builder.lateral(lateral).alias(alias).subquery({
let inner_statement = self.plan_to_sql(plan)?;
if let ast::Statement::Query(inner_query) = inner_statement {
inner_query
Expand All @@ -257,15 +258,17 @@ impl Unparser<'_> {
alias: &str,
plan: &LogicalPlan,
relation: &mut RelationBuilder,
lateral: bool,
) -> Result<()> {
if self.dialect.requires_derived_table_alias() {
self.derive(
plan,
relation,
Some(self.new_table_alias(alias.to_string(), vec![])),
lateral,
)
} else {
self.derive(plan, relation, None)
self.derive(plan, relation, None, lateral)
}
}

Expand Down Expand Up @@ -317,10 +320,12 @@ impl Unparser<'_> {
// Projection can be top-level plan for unnest relation
// The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
// only one expression, which is the placeholder column generated by the rewriter.
if self.dialect.unnest_as_table_factor()
&& p.expr.len() == 1
&& Self::is_unnest_placeholder(&p.expr[0])
{
let (is_unnest, is_lateral) = if p.expr.len() == 1 {
Self::is_unnest_placeholder_with_outer_ref(&p.expr[0])
} else {
(false, false)
};
if self.dialect.unnest_as_table_factor() && is_unnest {
if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
return self
.unnest_to_table_factor_sql(unnest, query, select, relation);
Expand All @@ -333,6 +338,7 @@ impl Unparser<'_> {
"derived_projection",
plan,
relation,
is_lateral,
);
}
self.reconstruct_select_statement(plan, p, select)?;
Expand Down Expand Up @@ -365,6 +371,7 @@ impl Unparser<'_> {
"derived_limit",
plan,
relation,
false,
);
}
if let Some(fetch) = &limit.fetch {
Expand Down Expand Up @@ -402,6 +409,7 @@ impl Unparser<'_> {
"derived_sort",
plan,
relation,
false,
);
}
let Some(query_ref) = query else {
Expand Down Expand Up @@ -472,6 +480,7 @@ impl Unparser<'_> {
"derived_distinct",
plan,
relation,
false,
);
}
let (select_distinct, input) = match distinct {
Expand Down Expand Up @@ -658,6 +667,7 @@ impl Unparser<'_> {
"derived_union",
plan,
relation,
false,
);
}

Expand Down Expand Up @@ -723,19 +733,51 @@ impl Unparser<'_> {
internal_err!("Unnest input is not a Projection: {unnest:?}")
}
}
_ => not_impl_err!("Unsupported operator: {plan:?}"),
LogicalPlan::Subquery(subquery)
if find_unnest_node_until_relation(subquery.subquery.as_ref())
.is_some() =>
{
if self.dialect.unnest_as_table_factor() {
self.select_to_sql_recursively(
subquery.subquery.as_ref(),
query,
select,
relation,
)
} else {
self.derive_with_dialect_alias(
"derived_unnest",
subquery.subquery.as_ref(),
relation,
true,
)
}
}
_ => {
not_impl_err!("Unsupported operator: {plan:?}")
}
}
}

/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`
/// Only match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
fn is_unnest_placeholder(expr: &Expr) -> bool {
/// The first return value is a boolean indicating if the column is a placeholder column:
/// Try to match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
/// The second return value is a boolean indicating if the column uses an outer reference:
/// Try to match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`
///
/// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
fn is_unnest_placeholder_with_outer_ref(expr: &Expr) -> (bool, bool) {
if let Expr::Alias(Alias { expr, .. }) = expr {
if let Expr::Column(Column { name, .. }) = expr.as_ref() {
return name.starts_with(UNNEST_PLACEHOLDER);
if name.starts_with(UNNEST_PLACEHOLDER) {
return (
true,
name[UNNEST_PLACEHOLDER.len()..].starts_with("(outer_ref("),
);
}
}
}
false
(false, false)
}

fn unnest_to_table_factor_sql(
Expand Down
53 changes: 51 additions & 2 deletions datafusion/sql/src/unparser/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
}
}

/// This logic is to work out the columns and inner query for SubqueryAlias plan for both types of
/// subquery
/// This logic is to work out the columns and inner query for SubqueryAlias plan for some types of
/// subquery or unnest
/// - `(SELECT column_a as a from table) AS A`
/// - `(SELECT column_a from table) AS A (a)`
/// - `SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)` (see [find_unnest_column_alias])
///
/// A roundtrip example for table alias with columns
///
Expand Down Expand Up @@ -222,6 +223,15 @@ pub(super) fn subquery_alias_inner_query_and_columns(
) -> (&LogicalPlan, Vec<Ident>) {
let plan: &LogicalPlan = subquery_alias.input.as_ref();

if let LogicalPlan::Subquery(subquery) = plan {
let (inner_projection, Some(column)) =
find_unnest_column_alias(subquery.subquery.as_ref())
else {
return (plan, vec![]);
};
return (inner_projection, vec![Ident::new(column)]);
}

let LogicalPlan::Projection(outer_projections) = plan else {
return (plan, vec![]);
};
Expand Down Expand Up @@ -257,6 +267,45 @@ pub(super) fn subquery_alias_inner_query_and_columns(
(outer_projections.input.as_ref(), columns)
}

/// Try to find the column alias for UNNEST in the inner projection.
/// For example:
/// ```sql
/// SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)
/// ```
/// The above query will be parsed into the following plan:
/// ```text
/// Projection: *
/// Cross Join:
/// SubqueryAlias: t1
/// TableScan: t
/// SubqueryAlias: u
/// Subquery:
/// Projection: UNNEST(outer_ref(t1.c1)) AS c1
/// Projection: __unnest_placeholder(outer_ref(t1.c1),depth=1) AS UNNEST(outer_ref(t1.c1))
/// Unnest: lists[__unnest_placeholder(outer_ref(t1.c1))|depth=1] structs[]
/// Projection: outer_ref(t1.c1) AS __unnest_placeholder(outer_ref(t1.c1))
/// EmptyRelation
/// ```
/// The function will return the inner projection and the column alias `c1` if the column name
/// starts with `UNNEST(` (the `Display` result of [Expr::Unnest]) in the inner projection.
pub(super) fn find_unnest_column_alias(
plan: &LogicalPlan,
) -> (&LogicalPlan, Option<String>) {
if let LogicalPlan::Projection(projection) = plan {
if projection.expr.len() != 1 {
return (plan, None);
}
if let Some(expr) = projection.expr.get(0) {
if let Expr::Alias(alias) = expr {
if alias.expr.schema_name().to_string().starts_with("UNNEST(") {
return (projection.input.as_ref(), Some(alias.name.clone()));
}
}
}
}
(plan, None)
}

/// Injects column aliases into a subquery's logical plan. The function searches for a `Projection`
/// within the given plan, which may be wrapped by other operators (e.g., LIMIT, SORT).
/// If the top-level plan is a `Projection`, it directly injects the column aliases.
Expand Down
25 changes: 25 additions & 0 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,31 @@ pub(crate) fn find_unnest_node_within_select(plan: &LogicalPlan) -> Option<&Unne
}
}

/// Recursively searches children of [LogicalPlan] to find Unnest node if exist
/// until encountering a Relation node with single input
pub(crate) fn find_unnest_node_until_relation(plan: &LogicalPlan) -> Option<&Unnest> {
// Note that none of the nodes that have a corresponding node can have more
// than 1 input node. E.g. Projection / Filter always have 1 input node.
let input = plan.inputs();
let input = if input.len() > 1 {
return None;
} else {
input.first()?
};

if let LogicalPlan::Unnest(unnest) = input {
Some(unnest)
} else if let LogicalPlan::TableScan(_) = input {
None
} else if let LogicalPlan::Subquery(_) = input {
None
} else if let LogicalPlan::SubqueryAlias(_) = input {
None
} else {
find_unnest_node_within_select(input)
}
}

/// Recursively searches children of [LogicalPlan] to find Window nodes if exist
/// prior to encountering a Join, TableScan, or a nested subquery (derived table factor).
/// If Window node is not found prior to this or at all before reaching the end
Expand Down
24 changes: 24 additions & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,30 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN UNNEST(u.array_col)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 (c1)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN UNNEST(u.array_col) AS t1 (c1)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL (SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))")"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
TestStatementWithDialect {
sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1 (c1)",
expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL (SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))") AS t1 (c1)"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
];

for query in tests {
Expand Down

0 comments on commit 7c5e3e3

Please sign in to comment.