Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Dec 4, 2023
1 parent d235b87 commit 0d24754
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ impl ExecutionPlan for FilterExec {
let num_rows = projected_stats.num_rows;
let total_byte_size = projected_stats.total_byte_size;
let input_analysis_ctx = AnalysisContext::try_from_statistics(
&self.input().schema(),
&self.input.schema(),
&projected_stats.column_statistics,
)?;

let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?;
let analysis_ctx = analyze(predicate, input_analysis_ctx, &self.schema())?;

// Estimate (inexact) selectivity of predicate
let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
Expand Down Expand Up @@ -331,13 +331,13 @@ pub(crate) fn batch_filter(
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
//load just the columns requested
let batch = match projection.as_ref() {
let projected_batch = match projection.as_ref() {
Some(columns) => batch.project(columns)?,
None => batch.clone(),
};
Ok(as_boolean_array(&array)?)
// apply filter array to record batch
.and_then(|filter_array| Ok(filter_record_batch(&batch, filter_array)?))
.and_then(|filter_array| Ok(filter_record_batch(&projected_batch, filter_array)?))
})
}

Expand All @@ -354,7 +354,6 @@ impl Stream for FilterExecStream {
Poll::Ready(value) => match value {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
// load just the columns requested
let filtered_batch = batch_filter(
&batch,
&self.predicate,
Expand Down

0 comments on commit 0d24754

Please sign in to comment.