From 5c86db0b2c5a459d54994b7c6ce6b461cc35d767 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 25 Apr 2024 15:24:09 -0400 Subject: [PATCH] Avoid some copies, encapsulate the handling of child indicies in `OptimizeProjection` (#10216) --- datafusion/common/src/dfschema.rs | 17 +- .../mod.rs} | 304 ++++-------------- .../optimize_projections/required_indices.rs | 227 +++++++++++++ 3 files changed, 311 insertions(+), 237 deletions(-) rename datafusion/optimizer/src/{optimize_projections.rs => optimize_projections/mod.rs} (87%) create mode 100644 datafusion/optimizer/src/optimize_projections/required_indices.rs diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index f1909f0dc8e1..64e40ea99e67 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -347,9 +347,22 @@ impl DFSchema { matches.next() } - /// Find the index of the column with the given qualifier and name - pub fn index_of_column(&self, col: &Column) -> Result { + /// Find the index of the column with the given qualifier and name, + /// returning `None` if not found + /// + /// See [Self::index_of_column] for a version that returns an error if the + /// column is not found + pub fn maybe_index_of_column(&self, col: &Column) -> Option { self.index_of_column_by_name(col.relation.as_ref(), &col.name) + } + + /// Find the index of the column with the given qualifier and name, + /// returning `Err` if not found + /// + /// See [Self::maybe_index_of_column] for a version that returns `None` if + /// the column is not found + pub fn index_of_column(&self, col: &Column) -> Result { + self.maybe_index_of_column(col) .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self)) } diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections/mod.rs similarity index 87% rename from datafusion/optimizer/src/optimize_projections.rs rename to datafusion/optimizer/src/optimize_projections/mod.rs index 70ffd8f24498..0f2aaa6cbcb3 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -17,16 +17,16 @@ //! [`OptimizeProjections`] identifies and eliminates unused columns +mod required_indices; + use std::collections::HashSet; use std::sync::Arc; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use arrow::datatypes::SchemaRef; use datafusion_common::{ - get_required_group_by_exprs_indices, internal_err, Column, DFSchema, DFSchemaRef, - JoinType, Result, + get_required_group_by_exprs_indices, internal_err, Column, JoinType, Result, }; use datafusion_expr::expr::{Alias, ScalarFunction}; use datafusion_expr::{ @@ -34,9 +34,10 @@ use datafusion_expr::{ Expr, Projection, TableScan, Window, }; +use crate::optimize_projections::required_indices::RequiredIndicies; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use hashbrown::HashMap; -use itertools::{izip, Itertools}; +use itertools::izip; /// Optimizer rule to prune unnecessary columns from intermediate schemas /// inside the [`LogicalPlan`]. This rule: @@ -70,8 +71,8 @@ impl OptimizerRule for OptimizeProjections { config: &dyn OptimizerConfig, ) -> Result> { // All output fields are necessary: - let indices = (0..plan.schema().fields().len()).collect::>(); - optimize_projections(plan, config, &indices) + let indices = RequiredIndicies::new_for_all_exprs(plan); + optimize_projections(plan, config, indices) } fn name(&self) -> &str { @@ -105,13 +106,9 @@ impl OptimizerRule for OptimizeProjections { fn optimize_projections( plan: &LogicalPlan, config: &dyn OptimizerConfig, - indices: &[usize], + indices: RequiredIndicies, ) -> Result> { - // `child_required_indices` stores - // - indices of the columns required for each child - // - a flag indicating whether putting a projection above children is beneficial for the parent. - // As an example LogicalPlan::Filter benefits from small tables. Hence for filter child this flag would be `true`. - let child_required_indices: Vec<(Vec, bool)> = match plan { + let child_required_indices: Vec = match plan { LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Repartition(_) @@ -123,12 +120,13 @@ fn optimize_projections( // that appear in this plan's expressions to its child. All these // operators benefit from "small" inputs, so the projection_beneficial // flag is `true`. - let exprs = plan.expressions(); plan.inputs() .into_iter() .map(|input| { - get_all_required_indices(indices, input, exprs.iter()) - .map(|idxs| (idxs, true)) + indices + .clone() + .with_projection_beneficial() + .with_plan_exprs(plan, input.schema()) }) .collect::>()? } @@ -137,13 +135,9 @@ fn optimize_projections( // that appear in this plan's expressions to its child. These operators // do not benefit from "small" inputs, so the projection_beneficial // flag is `false`. - let exprs = plan.expressions(); plan.inputs() .into_iter() - .map(|input| { - get_all_required_indices(indices, input, exprs.iter()) - .map(|idxs| (idxs, false)) - }) + .map(|input| indices.clone().with_plan_exprs(plan, input.schema())) .collect::>()? } LogicalPlan::Copy(_) @@ -159,16 +153,14 @@ fn optimize_projections( // TODO: For some subquery variants (e.g. a subquery arising from an // EXISTS expression), we may not need to require all indices. plan.inputs() - .iter() - .map(|input| ((0..input.schema().fields().len()).collect_vec(), false)) - .collect::>() + .into_iter() + .map(RequiredIndicies::new_for_all_exprs) + .collect() } LogicalPlan::Extension(extension) => { - let necessary_children_indices = if let Some(necessary_children_indices) = - extension.node.necessary_children_exprs(indices) - { - necessary_children_indices - } else { + let Some(necessary_children_indices) = + extension.node.necessary_children_exprs(indices.indices()) + else { // Requirements from parent cannot be routed down to user defined logical plan safely return Ok(None); }; @@ -178,16 +170,12 @@ fn optimize_projections( Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \ consistent with actual children length for the node."); } - // Expressions used by node. - let exprs = plan.expressions(); children .into_iter() .zip(necessary_children_indices) .map(|(child, necessary_indices)| { - let child_schema = child.schema(); - let child_req_indices = - indices_referred_by_exprs(child_schema, exprs.iter())?; - Ok((merge_slices(&necessary_indices, &child_req_indices), false)) + RequiredIndicies::new_from_indices(necessary_indices) + .with_plan_exprs(plan, child.schema()) }) .collect::>>()? } @@ -213,13 +201,9 @@ fn optimize_projections( LogicalPlan::Aggregate(aggregate) => { // Split parent requirements to GROUP BY and aggregate sections: let n_group_exprs = aggregate.group_expr_len()?; - let (group_by_reqs, mut aggregate_reqs): (Vec, Vec) = - indices.iter().partition(|&&idx| idx < n_group_exprs); // Offset aggregate indices so that they point to valid indices at // `aggregate.aggr_expr`: - for idx in aggregate_reqs.iter_mut() { - *idx -= n_group_exprs; - } + let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs); // Get absolutely necessary GROUP BY fields: let group_by_expr_existing = aggregate @@ -235,16 +219,16 @@ fn optimize_projections( // Some of the fields in the GROUP BY may be required by the // parent even if these fields are unnecessary in terms of // functional dependency. - let required_indices = - merge_slices(&simplest_groupby_indices, &group_by_reqs); - get_at_indices(&aggregate.group_expr, &required_indices) + group_by_reqs + .append(&simplest_groupby_indices) + .get_at_indices(&aggregate.group_expr) } else { aggregate.group_expr.clone() }; // Only use the absolutely necessary aggregate expressions required // by the parent: - let mut new_aggr_expr = get_at_indices(&aggregate.aggr_expr, &aggregate_reqs); + let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr); // Aggregations always need at least one aggregate expression. // With a nested count, we don't require any column as input, but @@ -263,10 +247,12 @@ fn optimize_projections( let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); let schema = aggregate.input.schema(); - let necessary_indices = indices_referred_by_exprs(schema, all_exprs_iter)?; + let necessary_indices = + RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?; + let necessary_exprs = necessary_indices.get_required_exprs(schema); let aggregate_input = if let Some(input) = - optimize_projections(&aggregate.input, config, &necessary_indices)? + optimize_projections(&aggregate.input, config, necessary_indices)? { input } else { @@ -277,7 +263,6 @@ fn optimize_projections( // that its input only contains absolutely necessary columns for // the aggregate expressions. Note that necessary_indices refer to // fields in `aggregate.input.schema()`. - let necessary_exprs = get_required_exprs(schema, &necessary_indices); let (aggregate_input, _) = add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)?; @@ -291,29 +276,24 @@ fn optimize_projections( .map(|aggregate| Some(LogicalPlan::Aggregate(aggregate))); } LogicalPlan::Window(window) => { + let input_schema = window.input.schema(); // Split parent requirements to child and window expression sections: - let n_input_fields = window.input.schema().fields().len(); - let (child_reqs, mut window_reqs): (Vec, Vec) = - indices.iter().partition(|&&idx| idx < n_input_fields); + let n_input_fields = input_schema.fields().len(); // Offset window expression indices so that they point to valid // indices at `window.window_expr`: - for idx in window_reqs.iter_mut() { - *idx -= n_input_fields; - } + let (child_reqs, window_reqs) = indices.split_off(n_input_fields); // Only use window expressions that are absolutely necessary according // to parent requirements: - let new_window_expr = get_at_indices(&window.window_expr, &window_reqs); + let new_window_expr = window_reqs.get_at_indices(&window.window_expr); // Get all the required column indices at the input, either by the // parent or window expression requirements. - let required_indices = get_all_required_indices( - &child_reqs, - &window.input, - new_window_expr.iter(), - )?; + let required_indices = + child_reqs.with_exprs(input_schema, &new_window_expr)?; + let window_child = if let Some(new_window_child) = - optimize_projections(&window.input, config, &required_indices)? + optimize_projections(&window.input, config, required_indices.clone())? { new_window_child } else { @@ -327,8 +307,7 @@ fn optimize_projections( // Calculate required expressions at the input of the window. // Please note that we use `old_child`, because `required_indices` // refers to `old_child`. - let required_exprs = - get_required_exprs(window.input.schema(), &required_indices); + let required_exprs = required_indices.get_required_exprs(input_schema); let (window_child, _) = add_projection_on_top_if_helpful(window_child, required_exprs)?; Window::try_new(new_window_expr, Arc::new(window_child)) @@ -339,31 +318,35 @@ fn optimize_projections( let left_len = join.left.schema().fields().len(); let (left_req_indices, right_req_indices) = split_join_requirements(left_len, indices, &join.join_type); - let exprs = plan.expressions(); let left_indices = - get_all_required_indices(&left_req_indices, &join.left, exprs.iter())?; + left_req_indices.with_plan_exprs(plan, join.left.schema())?; let right_indices = - get_all_required_indices(&right_req_indices, &join.right, exprs.iter())?; + right_req_indices.with_plan_exprs(plan, join.right.schema())?; // Joins benefit from "small" input tables (lower memory usage). // Therefore, each child benefits from projection: - vec![(left_indices, true), (right_indices, true)] + vec![ + left_indices.with_projection_beneficial(), + right_indices.with_projection_beneficial(), + ] } LogicalPlan::CrossJoin(cross_join) => { let left_len = cross_join.left.schema().fields().len(); - let (left_child_indices, right_child_indices) = + let (left_indices, right_indices) = split_join_requirements(left_len, indices, &JoinType::Inner); // Joins benefit from "small" input tables (lower memory usage). // Therefore, each child benefits from projection: - vec![(left_child_indices, true), (right_child_indices, true)] + vec![ + left_indices.with_projection_beneficial(), + right_indices.with_projection_beneficial(), + ] } LogicalPlan::TableScan(table_scan) => { - let schema = table_scan.source.schema(); // Get indices referred to in the original (schema with all fields) // given projected indices. - let projection = with_indices(&table_scan.projection, schema, |map| { - indices.iter().map(|&idx| map[idx]).collect() - }); - + let projection = match &table_scan.projection { + Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), + None => indices.into_inner(), + }; return TableScan::try_new( table_scan.table_name.clone(), table_scan.source.clone(), @@ -376,15 +359,16 @@ fn optimize_projections( }; let new_inputs = izip!(child_required_indices, plan.inputs().into_iter()) - .map(|((required_indices, projection_beneficial), child)| { + .map(|(required_indices, child)| { + let projection_beneficial = required_indices.projection_beneficial(); + let project_exprs = required_indices.get_required_exprs(child.schema()); let (input, is_changed) = if let Some(new_input) = - optimize_projections(child, config, &required_indices)? + optimize_projections(child, config, required_indices)? { (new_input, true) } else { (child.clone(), false) }; - let project_exprs = get_required_exprs(child.schema(), &required_indices); let (input, proj_added) = if projection_beneficial { add_projection_on_top_if_helpful(input, project_exprs)? } else { @@ -408,26 +392,6 @@ fn optimize_projections( } } -/// This function applies the given function `f` to the projection indices -/// `proj_indices` if they exist. Otherwise, applies `f` to a default set -/// of indices according to `schema`. -fn with_indices( - proj_indices: &Option>, - schema: SchemaRef, - mut f: F, -) -> Vec -where - F: FnMut(&[usize]) -> Vec, -{ - match proj_indices { - Some(indices) => f(indices.as_slice()), - None => { - let range: Vec = (0..schema.fields.len()).collect(); - f(range.as_slice()) - } - } -} - /// Merges consecutive projections. /// /// Given a projection `proj`, this function attempts to merge it with a previous @@ -653,132 +617,6 @@ fn outer_columns_helper_multi<'a>( exprs.into_iter().for_each(|e| outer_columns(e, columns)); } -/// Generates the required expressions (columns) that reside at `indices` of -/// the given `input_schema`. -/// -/// # Arguments -/// -/// * `input_schema` - A reference to the input schema. -/// * `indices` - A slice of `usize` indices specifying required columns. -/// -/// # Returns -/// -/// A vector of `Expr::Column` expressions residing at `indices` of the `input_schema`. -fn get_required_exprs(input_schema: &Arc, indices: &[usize]) -> Vec { - indices - .iter() - .map(|&idx| Expr::Column(Column::from(input_schema.qualified_field(idx)))) - .collect() -} - -/// Get indices of the fields referred to by any expression in `exprs` within -/// the given schema (`input_schema`). -/// -/// # Arguments -/// -/// * `input_schema`: The input schema to analyze for index requirements. -/// * `exprs`: An iterator of expressions for which we want to find necessary -/// field indices. -/// -/// # Returns -/// -/// A [`Result`] object containing the indices of all required fields in -/// `input_schema` to calculate all `exprs` successfully. -fn indices_referred_by_exprs<'a>( - input_schema: &DFSchemaRef, - exprs: impl Iterator, -) -> Result> { - let indices = exprs - .map(|expr| indices_referred_by_expr(input_schema, expr)) - .collect::>>()?; - Ok(indices - .into_iter() - .flatten() - // Make sure no duplicate entries exist and indices are ordered: - .sorted() - .dedup() - .collect()) -} - -/// Get indices of the fields referred to by the given expression `expr` within -/// the given schema (`input_schema`). -/// -/// # Parameters -/// -/// * `input_schema`: The input schema to analyze for index requirements. -/// * `expr`: An expression for which we want to find necessary field indices. -/// -/// # Returns -/// -/// A [`Result`] object containing the indices of all required fields in -/// `input_schema` to calculate `expr` successfully. -fn indices_referred_by_expr( - input_schema: &DFSchemaRef, - expr: &Expr, -) -> Result> { - let mut cols = expr.to_columns()?; - // Get outer-referenced (subquery) columns: - outer_columns(expr, &mut cols); - Ok(cols - .iter() - .flat_map(|col| input_schema.index_of_column(col)) - .collect()) -} - -/// Gets all required indices for the input; i.e. those required by the parent -/// and those referred to by `exprs`. -/// -/// # Parameters -/// -/// * `parent_required_indices` - A slice of indices required by the parent plan. -/// * `input` - The input logical plan to analyze for index requirements. -/// * `exprs` - An iterator of expressions used to determine required indices. -/// -/// # Returns -/// -/// A `Result` containing a vector of `usize` indices containing all the required -/// indices. -fn get_all_required_indices<'a>( - parent_required_indices: &[usize], - input: &LogicalPlan, - exprs: impl Iterator, -) -> Result> { - indices_referred_by_exprs(input.schema(), exprs) - .map(|indices| merge_slices(parent_required_indices, &indices)) -} - -/// Retrieves the expressions at specified indices within the given slice. Ignores -/// any invalid indices. -/// -/// # Parameters -/// -/// * `exprs` - A slice of expressions to index into. -/// * `indices` - A slice of indices specifying the positions of expressions sought. -/// -/// # Returns -/// -/// A vector of expressions corresponding to specified indices. -fn get_at_indices(exprs: &[Expr], indices: &[usize]) -> Vec { - indices - .iter() - // Indices may point to further places than `exprs` len. - .filter_map(|&idx| exprs.get(idx).cloned()) - .collect() -} - -/// Merges two slices into a single vector with sorted (ascending) and -/// deduplicated elements. For example, merging `[3, 2, 4]` and `[3, 6, 1]` -/// will produce `[1, 2, 3, 6]`. -fn merge_slices(left: &[T], right: &[T]) -> Vec { - // Make sure to sort before deduping, which removes the duplicates: - left.iter() - .cloned() - .chain(right.iter().cloned()) - .sorted() - .dedup() - .collect() -} - /// Splits requirement indices for a join into left and right children based on /// the join type. /// @@ -810,26 +648,21 @@ fn merge_slices(left: &[T], right: &[T]) -> Vec { /// adjusted based on the join type. fn split_join_requirements( left_len: usize, - indices: &[usize], + indices: RequiredIndicies, join_type: &JoinType, -) -> (Vec, Vec) { +) -> (RequiredIndicies, RequiredIndicies) { match join_type { // In these cases requirements are split between left/right children: JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - let (left_reqs, mut right_reqs): (Vec, Vec) = - indices.iter().partition(|&&idx| idx < left_len); // Decrease right side indices by `left_len` so that they point to valid // positions within the right child: - for idx in right_reqs.iter_mut() { - *idx -= left_len; - } - (left_reqs, right_reqs) + indices.split_off(left_len) } // All requirements can be re-routed to left child directly. - JoinType::LeftAnti | JoinType::LeftSemi => (indices.to_vec(), vec![]), + JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndicies::new()), // All requirements can be re-routed to right side directly. // No need to change index, join schema is right child schema. - JoinType::RightSemi | JoinType::RightAnti => (vec![], indices.to_vec()), + JoinType::RightSemi | JoinType::RightAnti => (RequiredIndicies::new(), indices), } } @@ -885,13 +718,14 @@ fn add_projection_on_top_if_helpful( fn rewrite_projection_given_requirements( proj: &Projection, config: &dyn OptimizerConfig, - indices: &[usize], + indices: RequiredIndicies, ) -> Result> { - let exprs_used = get_at_indices(&proj.expr, indices); + let exprs_used = indices.get_at_indices(&proj.expr); + let required_indices = - indices_referred_by_exprs(proj.input.schema(), exprs_used.iter())?; + RequiredIndicies::new().with_exprs(proj.input.schema(), exprs_used.iter())?; return if let Some(input) = - optimize_projections(&proj.input, config, &required_indices)? + optimize_projections(&proj.input, config, required_indices)? { if is_projection_unnecessary(&input, &exprs_used)? { Ok(Some(input)) diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs new file mode 100644 index 000000000000..113c100bbd9b --- /dev/null +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`RequiredIndicies`] helper for OptimizeProjection + +use crate::optimize_projections::outer_columns; +use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::{Column, DFSchemaRef, Result}; +use datafusion_expr::{Expr, LogicalPlan}; + +/// Represents columns in a schema which are required (used) by a plan node +/// +/// Also carries a flag indicating if putting a projection above children is +/// beneficial for the parent. For example `LogicalPlan::Filter` benefits from +/// small tables. Hence for filter child this flag would be `true`. Defaults to +/// `false` +/// +/// # Invariant +/// +/// Indices are always in order and without duplicates. For example, if these +/// indices were added `[3, 2, 4, 3, 6, 1]`, the instance would be represented +/// by `[1, 2, 3, 6]`. +#[derive(Debug, Clone, Default)] +pub(super) struct RequiredIndicies { + /// The indices of the required columns in the + indices: Vec, + /// If putting a projection above children is beneficial for the parent. + /// Defaults to false. + projection_beneficial: bool, +} + +impl RequiredIndicies { + /// Create a new, empty instance + pub fn new() -> Self { + Self::default() + } + + /// Create a new instance that requires all columns from the specified plan + pub fn new_for_all_exprs(plan: &LogicalPlan) -> Self { + Self { + indices: (0..plan.schema().fields().len()).collect(), + projection_beneficial: false, + } + } + + /// Create a new instance with the specified indices as required + pub fn new_from_indices(indices: Vec) -> Self { + Self { + indices, + projection_beneficial: false, + } + .compact() + } + + /// Convert the instance to its inner indices + pub fn into_inner(self) -> Vec { + self.indices + } + + /// Set the projection beneficial flag + pub fn with_projection_beneficial(mut self) -> Self { + self.projection_beneficial = true; + self + } + + /// Return the value of projection beneficial flag + pub fn projection_beneficial(&self) -> bool { + self.projection_beneficial + } + + /// Return a reference to the underlying indices + pub fn indices(&self) -> &[usize] { + &self.indices + } + + /// Add required indices for all `exprs` used in plan + pub fn with_plan_exprs( + mut self, + plan: &LogicalPlan, + schema: &DFSchemaRef, + ) -> Result { + // Add indices of the child fields referred to by the expressions in the + // parent + plan.apply_expressions(|e| { + self.add_expr(schema, e)?; + Ok(TreeNodeRecursion::Continue) + })?; + Ok(self.compact()) + } + + /// Adds the indices of the fields referred to by the given expression + /// `expr` within the given schema (`input_schema`). + /// + /// Self is NOT compacted (and thus this method is not pub) + /// + /// # Parameters + /// + /// * `input_schema`: The input schema to analyze for index requirements. + /// * `expr`: An expression for which we want to find necessary field indices. + fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> Result<()> { + // TODO could remove these clones (and visit the expression directly) + let mut cols = expr.to_columns()?; + // Get outer-referenced (subquery) columns: + outer_columns(expr, &mut cols); + self.indices.reserve(cols.len()); + for col in cols { + if let Some(idx) = input_schema.maybe_index_of_column(&col) { + self.indices.push(idx); + } + } + Ok(()) + } + + /// Adds the indices of the fields referred to by the given expressions + /// `within the given schema. + /// + /// # Parameters + /// + /// * `input_schema`: The input schema to analyze for index requirements. + /// * `exprs`: the expressions for which we want to find field indices. + pub fn with_exprs<'a>( + self, + schema: &DFSchemaRef, + exprs: impl IntoIterator, + ) -> Result { + exprs + .into_iter() + .try_fold(self, |mut acc, expr| { + acc.add_expr(schema, expr)?; + Ok(acc) + }) + .map(|acc| acc.compact()) + } + + /// Adds all `indices` into this instance. + pub fn append(mut self, indices: &[usize]) -> Self { + self.indices.extend_from_slice(indices); + self.compact() + } + + /// Splits this instance into a tuple with two instances: + /// * The first `n` indices + /// * The remaining indices, adjusted down by n + pub fn split_off(self, n: usize) -> (Self, Self) { + let (l, r) = self.partition(|idx| idx < n); + (l, r.map_indices(|idx| idx - n)) + } + + /// Partitions the indicies in this instance into two groups based on the + /// given predicate function `f`. + fn partition(&self, f: F) -> (Self, Self) + where + F: Fn(usize) -> bool, + { + let (l, r): (Vec, Vec) = + self.indices.iter().partition(|&&idx| f(idx)); + let projection_beneficial = self.projection_beneficial; + + ( + Self { + indices: l, + projection_beneficial, + }, + Self { + indices: r, + projection_beneficial, + }, + ) + } + + /// Map the indices in this instance to a new set of indices based on the + /// given function `f`, returning the mapped indices + /// + /// Not `pub` as it might not preserve the invariant of compacted indices + fn map_indices(mut self, f: F) -> Self + where + F: Fn(usize) -> usize, + { + self.indices.iter_mut().for_each(|idx| *idx = f(*idx)); + self + } + + /// Apply the given function `f` to each index in this instance, returning + /// the mapped indices + pub fn into_mapped_indices(self, f: F) -> Vec + where + F: Fn(usize) -> usize, + { + self.map_indices(f).into_inner() + } + + /// Returns the `Expr`s from `exprs` that are at the indices in this instance + pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec { + self.indices.iter().map(|&idx| exprs[idx].clone()).collect() + } + + /// Generates the required expressions (columns) that reside at `indices` of + /// the given `input_schema`. + pub fn get_required_exprs(&self, input_schema: &DFSchemaRef) -> Vec { + self.indices + .iter() + .map(|&idx| Expr::from(Column::from(input_schema.qualified_field(idx)))) + .collect() + } + + /// Compacts the indices of this instance so they are sorted + /// (ascending) and deduplicated. + fn compact(mut self) -> Self { + self.indices.sort_unstable(); + self.indices.dedup(); + self + } +}