Skip to content

Commit

Permalink
add column ids
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Mar 14, 2023
1 parent 8183b41 commit 54a0e4f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 0 deletions.
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ message ChainNode {
repeated plan_common.Field upstream_fields = 2;
// Which columns from upstream are used in this Chain node.
repeated uint32 upstream_column_indices = 3;
// TODO
repeated int32 upstream_column_ids = 8;
// Generally, the barrier needs to be rearranged during the MV creation process, so that data can
// be flushed to shared buffer periodically, instead of making the first epoch from batch query extra
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode.
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ impl LogicalScan {
.collect()
}

/// Get the ids of the output columns and primary key columns.
pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
let mut ids = self.output_column_ids();
for column_order in self.primary_key() {
let id = self.table_desc().columns[column_order.column_index].column_id;
if !ids.contains(&id) {
ids.push(id);
}
}
ids
}

pub fn output_column_indices(&self) -> &[usize] {
&self.core.output_col_idx
}
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ impl StreamNode for StreamIndexScan {
}

impl StreamIndexScan {
// TODO: this method is almost the same as `StreamTableScan::adhoc_to_stream_prost`, we should
// avoid duplication.
pub fn adhoc_to_stream_prost(&self) -> ProstStreamPlan {
use risingwave_pb::plan_common::*;
use risingwave_pb::stream_plan::*;
Expand All @@ -141,6 +143,14 @@ impl StreamIndexScan {

let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec();

let upstream_column_ids = match self.chain_type {
ChainType::Backfill => self.logical.output_and_pk_column_ids(),
ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => {
self.logical.output_column_ids()
}
ChainType::ChainUnspecified => unreachable!(),
};

ProstStreamPlan {
fields: self.schema().to_prost(),
input: vec![
Expand Down Expand Up @@ -195,6 +205,7 @@ impl StreamIndexScan {
.iter()
.map(|&i| i as _)
.collect(),
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
is_singleton: false,
table_desc: Some(self.logical.table_desc().to_protobuf()),
})),
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ impl StreamTableScan {

let stream_key = self.logical_pk().iter().map(|x| *x as u32).collect_vec();

// The required columns from the table (both scan and upstream).
let upstream_column_ids = match self.chain_type {
ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => {
self.logical.output_column_ids()
}
// For backfill, we additionally need the primary key columns.
ChainType::Backfill => self.logical.output_and_pk_column_ids(),
ChainType::ChainUnspecified => unreachable!(),
};

ProstStreamPlan {
fields: self.schema().to_prost(),
input: vec![
Expand Down Expand Up @@ -225,6 +235,7 @@ impl StreamTableScan {
.iter()
.map(|&i| i as _)
.collect(),
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
is_singleton: *self.distribution() == Distribution::Single,
// The table desc used by backfill executor
table_desc: Some(self.logical.table_desc().to_protobuf()),
Expand Down

0 comments on commit 54a0e4f

Please sign in to comment.