Skip to content

Commit

Permalink
chore: remove dist_key_indices in state table and storage table (risi…
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Mar 16, 2023
1 parent c683098 commit a4bd877
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 34 deletions.
12 changes: 2 additions & 10 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {
// FIXME: revisit constructions and usages.
pk_indices: Vec<usize>,

/// Indices of distribution key for computing vnode.
/// Note that the index is based on the all columns of the table, instead of the output ones.
// FIXME: revisit constructions and usages.
dist_key_indices: Vec<usize>,

/// Indices of distribution key for computing vnode.
/// Note that the index is based on the primary key columns by `pk_indices`.
dist_key_in_pk_indices: Vec<usize>,
Expand Down Expand Up @@ -266,7 +261,6 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
mapping: Arc::new(mapping),
row_serde: Arc::new(row_serde),
pk_indices,
dist_key_indices,
dist_key_in_pk_indices,
vnodes,
table_option,
Expand Down Expand Up @@ -592,23 +586,21 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
} else {
trace!(
"iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}",
"iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
self.table_id,
pk_prefix,
self.dist_key_indices,
pk_prefix_indices
);
None
};

trace!(
"iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}" ,
"iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}" ,
self.table_id,
prefix_hint,
start_key,
end_key,
pk_prefix,
self.dist_key_indices,
pk_prefix_indices
);

Expand Down
23 changes: 20 additions & 3 deletions src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod batch_table;

use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -53,6 +54,17 @@ impl Distribution {
}
}

pub fn fallback_vnodes() -> Arc<Bitmap> {
/// A bitmap that only the default vnode is set.
static FALLBACK_VNODES: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT);
vnodes.set(DEFAULT_VNODE.to_index(), true);
vnodes.finish().into()
});

FALLBACK_VNODES.clone()
}

/// Distribution that accesses all vnodes, mainly used for tests.
pub fn all_vnodes(dist_key_indices: Vec<usize>) -> Self {
/// A bitmap that all vnodes are set.
Expand Down Expand Up @@ -124,14 +136,19 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu
/// Get vnode values with `indices` on the given `chunk`.
pub fn compute_chunk_vnode(
chunk: &DataChunk,
indices: &[usize],
dist_key_in_pk_indices: &[usize],
pk_indices: &[usize],
vnodes: &Bitmap,
) -> Vec<VirtualNode> {
if indices.is_empty() {
if dist_key_in_pk_indices.is_empty() {
vec![DEFAULT_VNODE; chunk.capacity()]
} else {
let dist_key_indices = dist_key_in_pk_indices
.iter()
.map(|idx| pk_indices[*idx])
.collect_vec();
chunk
.get_hash_values(indices, Crc32FastBuilder)
.get_hash_values(&dist_key_indices, Crc32FastBuilder)
.into_iter()
.zip_eq_fast(chunk.vis().iter())
.map(|(h, vis)| {
Expand Down
43 changes: 23 additions & 20 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub struct StateTableInner<
/// Indices of distribution key for computing vnode.
/// Note that the index is based on the all columns of the table, instead of the output ones.
// FIXME: revisit constructions and usages.
dist_key_indices: Vec<usize>,
// dist_key_indices: Vec<usize>,

/// Indices of distribution key for computing vnode.
/// Note that the index is based on the primary key columns by `pk_indices`.
Expand Down Expand Up @@ -198,15 +198,10 @@ where
.collect();
let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);

let Distribution {
dist_key_indices,
vnodes,
} = match vnodes {
Some(vnodes) => Distribution {
dist_key_indices,
vnodes,
},
None => Distribution::fallback(),
let vnodes = match vnodes {
Some(vnodes) => vnodes,

None => Distribution::fallback_vnodes(),
};
let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
let vnode_col_idx = *idx as usize;
Expand Down Expand Up @@ -251,7 +246,6 @@ where
pk_serde,
row_serde,
pk_indices: pk_indices.to_vec(),
dist_key_indices,
dist_key_in_pk_indices,
prefix_hint_len,
vnodes,
Expand Down Expand Up @@ -452,7 +446,6 @@ where
pk_serde,
row_serde: SD::new(&column_ids, Arc::from(data_types.into_boxed_slice())),
pk_indices,
dist_key_indices,
dist_key_in_pk_indices,
prefix_hint_len,
vnodes,
Expand All @@ -475,7 +468,7 @@ where
if self.vnode_col_idx_in_pk.is_some() {
false
} else {
self.dist_key_indices.is_empty()
self.dist_key_in_pk_indices.is_empty()
}
}

Expand Down Expand Up @@ -503,8 +496,13 @@ where
}

/// Get the vnode value of the given row
pub fn compute_vnode(&self, row: impl Row) -> VirtualNode {
compute_vnode(row, &self.dist_key_indices, &self.vnodes)
// pub fn compute_vnode(&self, row: impl Row) -> VirtualNode {
// compute_vnode(row, &self.dist_key_indices, &self.vnodes)
// }

/// Get the vnode value of the given row
pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes)
}

// TODO: remove, should not be exposed to user
Expand All @@ -516,9 +514,9 @@ where
&self.pk_serde
}

pub fn dist_key_indices(&self) -> &[usize] {
&self.dist_key_indices
}
// pub fn dist_key_indices(&self) -> &[usize] {
// &self.dist_key_indices
// }

pub fn vnodes(&self) -> &Arc<Bitmap> {
&self.vnodes
Expand Down Expand Up @@ -724,7 +722,12 @@ where
pub fn write_chunk(&mut self, chunk: StreamChunk) {
let (chunk, op) = chunk.into_parts();

let vnodes = compute_chunk_vnode(&chunk, &self.dist_key_indices, &self.vnodes);
let vnodes = compute_chunk_vnode(
&chunk,
&self.dist_key_in_pk_indices,
&self.pk_indices,
&self.vnodes,
);

let value_chunk = if let Some(ref value_indices) = self.value_indices {
chunk.clone().reorder_columns(value_indices)
Expand Down Expand Up @@ -984,7 +987,7 @@ where
trace!(
table_id = %self.table_id(),
?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix,
dist_key_indices = ?self.dist_key_indices, ?pk_prefix_indices,
?pk_prefix_indices,
"storage_iter_with_prefix"
);

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl<S: StateStore> SortExecutor<S> {
let no_longer_owned_vnodes =
Bitmap::bit_saturate_subtract(prev_vnode_bitmap, curr_vnode_bitmap);
self.buffer.retain(|(_, pk), _| {
let vnode = self.state_table.compute_vnode(pk);
let vnode = self.state_table.compute_vnode_by_pk(pk);
!no_longer_owned_vnodes.is_set(vnode.to_index())
});
}
Expand Down

0 comments on commit a4bd877

Please sign in to comment.