Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Dec 1, 2023
1 parent 32f29d2 commit 137fc68
Showing 1 changed file with 27 additions and 2 deletions.
29 changes: 27 additions & 2 deletions datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
//! - Adds projection to decrease table column size before operators that benefits from less memory at its input.
//! - Removes unnecessary [LogicalPlan::Projection] from the [LogicalPlan].
use crate::optimizer::ApplyOrder;
use datafusion_common::{Column, DFSchema, DFSchemaRef, JoinType, Result};
use datafusion_common::{Column, DFSchema, DFSchemaRef, JoinType, Result, DFField};
use datafusion_expr::Filter;
use datafusion_expr::expr::{Alias, ScalarFunction};
use datafusion_expr::{
logical_plan::LogicalPlan, projection_schema, Aggregate, BinaryExpr, Cast, Distinct,
Expand Down Expand Up @@ -347,7 +348,31 @@ fn optimize_projections(
.map(|(new_input, old_child)| new_input.unwrap_or_else(|| old_child.clone()))
.collect::<Vec<_>>();
let res = plan.with_new_inputs(&new_inputs)?;
Ok(Some(res))
match &res {
LogicalPlan::Filter(f) => {
let schema = res.schema();
let fields: Vec<DFField> = indices
.iter()
.map(|c| {
plan.schema().field(*c).clone()
})
.collect();
let new_schema = DFSchema::new_with_metadata(fields, schema.metadata().clone())?;
let new_schema = if !schema.logically_equivalent_names_and_types(&new_schema) {
Some(Arc::new(new_schema))
} else {
None
};

Ok(Some(Filter::try_new(
f.predicate.clone(),
f.input.clone(),
new_schema,
).map(LogicalPlan::Filter)?))

},
_ => Ok(Some(res)),
}
}
}

Expand Down

0 comments on commit 137fc68

Please sign in to comment.