Skip to content

Commit

Permalink
ultilize column id
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Mar 14, 2023
1 parent 54a0e4f commit 3dcd6b4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 14 deletions.
5 changes: 5 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ impl From<i32> for ColumnId {
Self::new(column_id)
}
}
impl From<&i32> for ColumnId {
fn from(column_id: &i32) -> Self {
Self::new(*column_id)
}
}

impl From<ColumnId> for i32 {
fn from(id: ColumnId) -> i32 {
Expand Down
14 changes: 14 additions & 0 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,20 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pub fn pk_indices(&self) -> &[usize] {
&self.pk_indices
}

pub fn output_indices(&self) -> &[usize] {
&self.output_indices
}

/// Get the indices of the primary key columns in the output columns.
///
/// Returns `None` if any of the primary key columns is not in the output columns.
pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
self.pk_indices
.iter()
.map(|&i| self.output_indices.iter().position(|&j| i == j))
.collect()
}
}

/// Point get
Expand Down
44 changes: 32 additions & 12 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use either::Either;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -104,10 +105,24 @@ where

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
// Table storage primary key.
let table_pk_indices = self.table.pk_indices();
// The primary key columns, in the output columns of the table scan.
let pk_in_output_indices = self.table.pk_in_output_indices().unwrap();
let pk_order = self.table.pk_serializer().get_order_types();
let upstream_indices = self.upstream_indices;

// TODO: unify these two mappings if we make the upstream and table output the same.
// The columns to be forwarded to the downstream, in the upstream columns.
let downstream_in_upstream_indices = self.upstream_indices;
// The columns to be forwarded to the downstream, in the output columns of the table scan.
let downstream_in_output_indices = downstream_in_upstream_indices
.iter()
.map(|&i| {
self.table
.output_indices()
.iter()
.position(|&j| i == j)
.unwrap()
})
.collect_vec();

let mut upstream = self.upstream.execute();

Expand Down Expand Up @@ -139,7 +154,9 @@ where
// Forward messages directly to the downstream.
#[for_await]
for message in upstream {
if let Some(message) = Self::mapping_message(message?, &upstream_indices) {
if let Some(message) =
Self::mapping_message(message?, &downstream_in_upstream_indices)
{
yield message;
}
}
Expand Down Expand Up @@ -213,10 +230,10 @@ where
Self::mark_chunk(
chunk,
current_pos,
table_pk_indices,
&pk_in_output_indices,
pk_order,
),
&upstream_indices,
&downstream_in_upstream_indices,
));
}
}
Expand Down Expand Up @@ -255,7 +272,7 @@ where
processed_rows += chunk.cardinality() as u64;
yield Message::Chunk(Self::mapping_chunk(
chunk,
&upstream_indices,
&downstream_in_upstream_indices,
));
}

Expand All @@ -272,11 +289,14 @@ where
.last()
.unwrap()
.1
.project(table_pk_indices)
.project(&pk_in_output_indices)
.into_owned_row(),
);
processed_rows += chunk.cardinality() as u64;
yield Message::Chunk(Self::mapping_chunk(chunk, &upstream_indices));
yield Message::Chunk(Self::mapping_chunk(
chunk,
&downstream_in_output_indices,
));
}
}
}
Expand All @@ -293,7 +313,7 @@ where
// Forward messages directly to the downstream.
#[for_await]
for msg in upstream {
if let Some(msg) = Self::mapping_message(msg?, &upstream_indices) {
if let Some(msg) = Self::mapping_message(msg?, &downstream_in_upstream_indices) {
if let Some(barrier) = msg.as_barrier() {
self.progress.finish(barrier.epoch.curr);
}
Expand Down Expand Up @@ -360,7 +380,7 @@ where
fn mark_chunk(
chunk: StreamChunk,
current_pos: &OwnedRow,
table_pk_indices: PkIndicesRef<'_>,
pk_in_output_indices: PkIndicesRef<'_>,
pk_order: &[OrderType],
) -> StreamChunk {
let chunk = chunk.compact();
Expand All @@ -369,7 +389,7 @@ where
// Use project to avoid allocation.
for v in data.rows().map(|row| {
match row
.project(table_pk_indices)
.project(pk_in_output_indices)
.iter()
.zip_eq_fast(pk_order.iter())
.cmp_by(current_pos.iter(), |(x, order), y| {
Expand Down
8 changes: 6 additions & 2 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::{ColumnDesc, TableId, TableOption};
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption};
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_pb::stream_plan::{ChainNode, ChainType};
Expand Down Expand Up @@ -98,7 +98,11 @@ impl ExecutorBuilder for ChainExecutorBuilder {
.iter()
.map(ColumnDesc::from)
.collect_vec();
let column_ids = column_descs.iter().map(|x| x.column_id).collect_vec();
let column_ids = node
.upstream_column_ids
.iter()
.map(ColumnId::from)
.collect_vec();

// Use indices based on full table instead of streaming executor output.
let pk_indices = table_desc
Expand Down

0 comments on commit 3dcd6b4

Please sign in to comment.