Skip to content

Commit

Permalink
Don't add filters to used columns
Browse files Browse the repository at this point in the history
Add test

WIP fix

Fix filter after scan

Totally reemove filter to column extraction

Fix test

Update tests 1

Update tests 2

Update tests 3
  • Loading branch information
Dandandan committed Sep 28, 2023
1 parent 32dfbb0 commit 27069f0
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 140 deletions.
9 changes: 5 additions & 4 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,15 +1287,16 @@ impl TableProvider for DataFrameTableProvider {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut expr = LogicalPlanBuilder::from(self.plan.clone());
if let Some(p) = projection {
expr = expr.select(p.iter().copied())?
}

// Add filter when given
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(filter) = filter {
expr = expr.filter(filter)?
}

if let Some(p) = projection {
expr = expr.select(p.iter().copied())?
}

// add a limit if given
if let Some(l) = limit {
expr = expr.limit(0, Some(l))?
Expand Down
27 changes: 14 additions & 13 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,20 @@ impl TableProvider for ViewTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = if let Some(projection) = projection {
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
let plan = self.logical_plan().clone();
let mut plan = LogicalPlanBuilder::from(plan);

if let Some(filter) = filter {
plan = plan.filter(filter)?;
}

let mut plan = if let Some(projection) = projection {
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
let current_projection =
(0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
(0..plan.schema().fields().len()).collect::<Vec<usize>>();
if projection == &current_projection {
self.logical_plan().clone()
plan
} else {
let fields: Vec<Expr> = projection
.iter()
Expand All @@ -123,19 +131,11 @@ impl TableProvider for ViewTable {
)
})
.collect();
LogicalPlanBuilder::from(self.logical_plan.clone())
.project(fields)?
.build()?
plan.project(fields)?
}
} else {
self.logical_plan().clone()
plan
};
let mut plan = LogicalPlanBuilder::from(plan);
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));

if let Some(filter) = filter {
plan = plan.filter(filter)?;
}

if let Some(limit) = limit {
plan = plan.limit(0, Some(limit))?;
Expand Down Expand Up @@ -439,6 +439,7 @@ mod tests {
.select_columns(&["bool_col", "int_col"])?;

let plan = df.explain(false, false)?.collect().await?;

// Filters all the way to Parquet
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
Expand Down
11 changes: 10 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::expr_rewriter::{
};
use crate::type_coercion::binary::comparison_coercion;
use crate::utils::{columnize_expr, compare_sort_expr};
use crate::{and, binary_expr, DmlStatement, Operator, WriteOp};
use crate::{
and, binary_expr, DmlStatement, Operator, TableProviderFilterPushDown, WriteOp,
};
use crate::{
logical_plan::{
Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join,
Expand Down Expand Up @@ -1402,6 +1404,13 @@ impl TableSource for LogicalTableSource {
fn schema(&self) -> SchemaRef {
self.table_schema.clone()
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<crate::TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}
}

/// Create a [`LogicalPlan::Unnest`] plan
Expand Down
Loading

0 comments on commit 27069f0

Please sign in to comment.