Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): Support index selection for sort aggregation with a descending ordering #8515

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/frontend/planner_test/tests/testdata/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1230,3 +1230,23 @@
└─StreamHashAgg { group_key: [idx.col1, $expr1], aggs: [count] }
└─StreamProject { exprs: [idx.col1, idx.id, Vnode(idx.id) as $expr1] }
└─StreamTableScan { table: idx, columns: [idx.col1, idx.id], pk: [idx.id], dist: UpstreamHashShard(idx.id) }
- name: sort agg on an ascending index
sql: |
create table t (a int, b int);
create index idx_asc on t(a asc);
create index idx_desc on t(a desc);
select a, count(*) cnt from t group by a order by a asc;
batch_plan: |
BatchExchange { order: [idx_asc.a ASC], dist: Single }
└─BatchSortAgg { group_key: [idx_asc.a], aggs: [count] }
└─BatchScan { table: idx_asc, columns: [idx_asc.a], distribution: UpstreamHashShard(idx_asc.a) }
- name: sort agg on a descending index
sql: |
create table t (a int, b int);
create index idx_asc on t(a asc);
create index idx_desc on t(a desc);
select a, count(*) cnt from t group by a order by a desc;
batch_plan: |
BatchExchange { order: [idx_desc.a DESC], dist: Single }
└─BatchSortAgg { group_key: [idx_desc.a], aggs: [count] }
└─BatchScan { table: idx_desc, columns: [idx_desc.a], distribution: UpstreamHashShard(idx_desc.a) }
6 changes: 4 additions & 2 deletions src/frontend/src/catalog/index_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,14 @@ impl IndexCatalog {
self.index_table.columns.len() == self.primary_table.columns.len()
}

/// a mapping maps column index of secondary index to column index of primary table
/// A mapping maps the column index of the secondary index to the column index of the primary
/// table.
pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
&self.secondary_to_primary_mapping
}

/// a mapping maps column index of primary table to column index of secondary index
/// A mapping maps the column index of the primary table to the column index of the secondary
/// index.
pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
&self.primary_to_secondary_mapping
}
Expand Down
91 changes: 66 additions & 25 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::sort_util::ColumnOrder;

use super::generic::{GenericPlanNode, GenericPlanRef};
use super::{
Expand Down Expand Up @@ -267,6 +267,65 @@ impl LogicalScan {
self.i2o_col_mapping().rewrite_bitset(watermark_columns)
}

/// Return indexes can satisfy the required order.
pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Rc<IndexCatalog>> {
let output_col_map = self
.output_col_idx()
.iter()
.cloned()
.enumerate()
.map(|(id, col)| (col, id))
.collect::<BTreeMap<_, _>>();
let unmatched_idx = output_col_map.len();
self.indexes()
.iter()
.filter(|idx| {
let s2p_mapping = idx.secondary_to_primary_mapping();
Order {
column_orders: idx
.index_table
.pk()
.iter()
.map(|idx_item| {
ColumnOrder::new(
*output_col_map
.get(
s2p_mapping
.get(&idx_item.column_index)
.expect("should be in s2p mapping"),
)
.unwrap_or(&unmatched_idx),
idx_item.order_type,
)
})
.collect(),
}
.satisfies(required_order)
})
.collect()
}

/// If the index can cover the scan, transform it to the index scan.
pub fn to_index_scan_if_index_covered(&self, index: &Rc<IndexCatalog>) -> Option<LogicalScan> {
let p2s_mapping = index.primary_to_secondary_mapping();
if self
.required_col_idx()
.iter()
.all(|x| p2s_mapping.contains_key(x))
{
let index_scan = self.to_index_scan(
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
);
Some(index_scan)
} else {
None
}
}

/// Prerequisite: the caller should guarantee that `primary_to_secondary_mapping` must cover the
/// scan.
pub fn to_index_scan(
&self,
index_name: &str,
Expand Down Expand Up @@ -581,32 +640,14 @@ impl LogicalScan {
return None;
}

let index = self.indexes().iter().find(|idx| {
Order {
column_orders: idx
.index_item
.iter()
.map(|idx_item| ColumnOrder::new(idx_item.index, OrderType::ascending()))
.collect(),
let order_satisfied_index = self.indexes_satisfy_order(required_order);
for index in order_satisfied_index {
if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
return Some(index_scan.to_batch());
}
.satisfies(required_order)
})?;

let p2s_mapping = index.primary_to_secondary_mapping();
if self
.required_col_idx()
.iter()
.all(|x| p2s_mapping.contains_key(x))
{
let index_scan = self.to_index_scan(
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
);
Some(index_scan.to_batch())
} else {
None
}

None
}
}

Expand Down
136 changes: 49 additions & 87 deletions src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ impl Rule for MinMaxOnIndexRule {
if !logical_scan.predicate().always_true() {
return None;
}
let output_col_map = logical_scan
.output_col_idx()
.iter()
.cloned()
.enumerate()
.map(|(id, col)| (col, id))
.collect::<BTreeMap<_, _>>();
let order = Order {
column_orders: vec![ColumnOrder::new(
calls.first()?.inputs.first()?.index(),
Expand All @@ -74,12 +67,10 @@ impl Rule for MinMaxOnIndexRule {
},
)],
};
if let Some(p) =
self.try_on_index(logical_agg, logical_scan.clone(), &order, &output_col_map)
{
if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
Some(p)
} else {
self.try_on_pk(logical_agg, logical_scan, &order, &output_col_map)
self.try_on_pk(logical_agg, logical_scan, &order)
}
} else {
None
Expand All @@ -96,93 +87,64 @@ impl MinMaxOnIndexRule {
&self,
logical_agg: &LogicalAgg,
logical_scan: LogicalScan,
order: &Order,
output_col_map: &BTreeMap<usize, usize>,
required_order: &Order,
) -> Option<PlanRef> {
let unmatched_idx = output_col_map.len();
let index = logical_scan.indexes().iter().find(|idx| {
let s2p_mapping = idx.secondary_to_primary_mapping();
Order {
column_orders: idx
.index_table
.pk()
.iter()
.map(|idx_item| {
ColumnOrder::new(
*output_col_map
.get(
s2p_mapping
.get(&idx_item.column_index)
.expect("should be in s2p mapping"),
)
.unwrap_or(&unmatched_idx),
idx_item.order_type,
)
})
.collect(),
let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
for index in order_satisfied_index {
if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
let non_null_filter = LogicalFilter::create_with_expr(
index_scan.into(),
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![ExprImpl::InputRef(Box::new(InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)))],
DataType::Boolean,
)
.into(),
);

let limit = LogicalLimit::create(non_null_filter, 1, 0);

let formatting_agg = LogicalAgg::new(
vec![PlanAggCall {
agg_kind: logical_agg.agg_calls().first()?.agg_kind,
return_type: logical_agg.schema().fields[0].data_type.clone(),
inputs: vec![InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)],
order_by: vec![],
distinct: false,
filter: Condition {
conjunctions: vec![],
},
}],
vec![],
limit,
);

return Some(formatting_agg.into());
}
.satisfies(order)
})?;

let p2s_mapping = index.primary_to_secondary_mapping();
}

let index_scan = if logical_scan
.required_col_idx()
.iter()
.all(|x| p2s_mapping.contains_key(x))
{
Some(logical_scan.to_index_scan(
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
))
} else {
None
}?;

let non_null_filter = LogicalFilter::create_with_expr(
index_scan.into(),
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![ExprImpl::InputRef(Box::new(InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)))],
DataType::Boolean,
)
.into(),
);

let limit = LogicalLimit::create(non_null_filter, 1, 0);

let formatting_agg = LogicalAgg::new(
vec![PlanAggCall {
agg_kind: logical_agg.agg_calls().first()?.agg_kind,
return_type: logical_agg.schema().fields[0].data_type.clone(),
inputs: vec![InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)],
order_by: vec![],
distinct: false,
filter: Condition {
conjunctions: vec![],
},
}],
vec![],
limit,
);

Some(formatting_agg.into())
None
}

fn try_on_pk(
&self,
logical_agg: &LogicalAgg,
logical_scan: LogicalScan,
order: &Order,
output_col_map: &BTreeMap<usize, usize>,
) -> Option<PlanRef> {
let output_col_map = logical_scan
.output_col_idx()
.iter()
.cloned()
.enumerate()
.map(|(id, col)| (col, id))
.collect::<BTreeMap<_, _>>();
let unmatched_idx = output_col_map.len();
let primary_key = logical_scan.primary_key();
let primary_key_order = Order {
Expand Down
Loading