Skip to content

Commit

Permalink
feat(storage): do not compress table_id (risingwavelabs#8512)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Mar 16, 2023
1 parent 7641b15 commit 64a5f88
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 27 deletions.
24 changes: 24 additions & 0 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ impl<T: AsRef<[u8]>> UserKey<T> {
buf.put_slice(self.table_key.as_ref());
}

pub fn encode_table_key_into(&self, buf: &mut impl BufMut) {
buf.put_slice(self.table_key.as_ref());
}

/// Encode in to a buffer.
pub fn encode_length_prefixed(&self, buf: &mut impl BufMut) {
buf.put_u32(self.table_id.table_id());
Expand Down Expand Up @@ -583,6 +587,12 @@ impl<T: AsRef<[u8]>> FullKey<T> {
buf
}

// Encode in to a buffer.
pub fn encode_into_without_table_id(&self, buf: &mut impl BufMut) {
self.user_key.encode_table_key_into(buf);
buf.put_u64(self.epoch);
}

pub fn encode_reverse_epoch(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(
TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN,
Expand Down Expand Up @@ -614,6 +624,20 @@ impl<'a> FullKey<&'a [u8]> {
}
}

/// Construct a [`FullKey`] from a byte slice without `table_id` encoded.
pub fn from_slice_without_table_id(
table_id: TableId,
slice_without_table_id: &'a [u8],
) -> Self {
let epoch_pos = slice_without_table_id.len() - EPOCH_LEN;
let epoch = (&slice_without_table_id[epoch_pos..]).get_u64();

Self {
user_key: UserKey::new(table_id, TableKey(&slice_without_table_id[..epoch_pos])),
epoch,
}
}

/// Construct a [`FullKey`] from a byte slice.
pub fn decode_reverse_epoch(slice: &'a [u8]) -> Self {
let epoch_pos = slice.len() - EPOCH_LEN;
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl SstableStreamIterator {

if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
block_iter.seek(seek_key);

if !block_iter.is_valid() {
// `seek_key` is larger than everything in the first block.
self.next_block().await?;
Expand Down
41 changes: 34 additions & 7 deletions src/storage/src/hummock/sstable/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::mem::size_of;
use std::ops::Range;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::KeyComparator;
use {lz4, zstd};
Expand Down Expand Up @@ -142,13 +143,18 @@ pub struct Block {
pub data: Bytes,
/// Uncompressed entried data length.
data_len: usize,

/// Table id of this block.
table_id: TableId,

/// Restart points.
restart_points: Vec<RestartPoint>,
}

impl Block {
pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
// Verify checksum.

let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;

Expand Down Expand Up @@ -184,11 +190,12 @@ impl Block {
}

pub fn decode_from_raw(buf: Bytes) -> Self {
let table_id = (&buf[buf.len() - 4..]).get_u32_le();
// decode restart_points_type_index
let n_index = ((&buf[buf.len() - 4..]).get_u32_le()) as usize;
let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize;
let index_data_len = size_of::<u32>() + n_index * RestartPoint::size_of();
let data_len = buf.len() - index_data_len;
let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 4];
let data_len = buf.len() - 4 - index_data_len;
let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8];

let mut index_key_vec = Vec::with_capacity(n_index);
for _ in 0..n_index {
Expand All @@ -213,6 +220,7 @@ impl Block {
let mut restart_points_buf = &buf[data_len..restarts_end];

let mut type_index: usize = 0;

for _ in 0..n_restarts {
let offset = restart_points_buf.get_u32_le();
if type_index < index_key_vec.len() - 1
Expand All @@ -232,6 +240,7 @@ impl Block {
data: buf,
data_len,
restart_points,
table_id: TableId::new(table_id),
}
}

Expand All @@ -243,7 +252,13 @@ impl Block {
}

pub fn capacity(&self) -> usize {
self.data.len() + self.restart_points.capacity() * std::mem::size_of::<u32>()
self.data.len()
+ self.restart_points.capacity() * std::mem::size_of::<u32>()
+ std::mem::size_of::<u32>()
}

pub fn table_id(&self) -> TableId {
self.table_id
}

/// Gets restart point by index.
Expand Down Expand Up @@ -385,6 +400,7 @@ pub struct BlockBuilder {
/// Compression algorithm.
compression_algorithm: CompressionAlgorithm,

table_id: Option<u32>,
// restart_points_type_index stores only the restart_point corresponding to each type change,
// as an index, in order to reduce space usage
restart_points_type_index: Vec<RestartPoint>,
Expand All @@ -402,6 +418,7 @@ impl BlockBuilder {
last_key: vec![],
entry_count: 0,
compression_algorithm: options.compression_algorithm,
table_id: None,
restart_points_type_index: Vec::default(),
}
}
Expand All @@ -420,15 +437,20 @@ impl BlockBuilder {
///
/// Panic if key is not added in ASCEND order.
pub fn add(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
let input_table_id = full_key.user_key.table_id.table_id();
match self.table_id {
Some(current_table_id) => debug_assert_eq!(current_table_id, input_table_id),
None => self.table_id = Some(input_table_id),
}
#[cfg(debug_assertions)]
self.debug_valid();

let mut key: BytesMut = Default::default();
full_key.encode_into(&mut key);
full_key.encode_into_without_table_id(&mut key);
if self.entry_count > 0 {
debug_assert!(!key.is_empty());
debug_assert_eq!(
KeyComparator::compare_encoded_full_key(&self.last_key[..], &key),
KeyComparator::compare_encoded_full_key(&self.last_key[..], &key[..]),
Ordering::Less
);
}
Expand Down Expand Up @@ -462,7 +484,7 @@ impl BlockBuilder {

key.as_ref()
} else {
bytes_diff_below_max_key_length(&self.last_key, &key)
bytes_diff_below_max_key_length(&self.last_key, &key[..])
};

let prefix = KeyPrefix::new_without_len(
Expand Down Expand Up @@ -492,6 +514,7 @@ impl BlockBuilder {
pub fn clear(&mut self) {
self.buf.clear();
self.restart_points.clear();
self.table_id = None;
self.restart_points_type_index.clear();
self.last_key.clear();
self.entry_count = 0;
Expand All @@ -504,6 +527,7 @@ impl BlockBuilder {
+ (RestartPoint::size_of()) // (offset + len_type(u8)) * len
* self.restart_points_type_index.len()
+ std::mem::size_of::<u32>() // restart_points_type_index len
+ std::mem::size_of::<u32>() // table_id len
}

/// Finishes building block.
Expand Down Expand Up @@ -545,6 +569,7 @@ impl BlockBuilder {
self.buf
.put_u32_le(self.restart_points_type_index.len() as u32);

self.buf.put_u32_le(self.table_id.unwrap());
match self.compression_algorithm {
CompressionAlgorithm::None => (),
CompressionAlgorithm::Lz4 => {
Expand Down Expand Up @@ -581,6 +606,7 @@ impl BlockBuilder {
self.compression_algorithm.encode(&mut self.buf);
let checksum = xxhash64_checksum(&self.buf);
self.buf.put_u64_le(checksum);

self.buf.as_ref()
}

Expand All @@ -595,6 +621,7 @@ impl BlockBuilder {
+ std::mem::size_of::<u32>() // restart_points_type_indics.len
+ std::mem::size_of::<CompressionAlgorithm>() // compression_algorithm
+ std::mem::size_of::<u64>() // checksum
+ std::mem::size_of::<u32>() // table_id
}

pub fn debug_valid(&self) {
Expand Down
38 changes: 18 additions & 20 deletions src/storage/src/hummock/sstable/block_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::ops::Range;

use bytes::BytesMut;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::KeyComparator;

use super::{KeyPrefix, LenType, RestartPoint};
use crate::hummock::BlockHolder;
Expand Down Expand Up @@ -77,7 +76,8 @@ impl BlockIterator {

pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());
FullKey::decode(&self.key)

FullKey::from_slice_without_table_id(self.block.table_id(), &self.key[..])
}

pub fn value(&self) -> &[u8] {
Expand All @@ -99,19 +99,19 @@ impl BlockIterator {
}

pub fn seek(&mut self, key: FullKey<&[u8]>) {
let full_key_encoded = key.encode();
self.seek_restart_point_by_key(&full_key_encoded);
self.next_until_key(&full_key_encoded);
self.seek_restart_point_by_key(key);

self.next_until_key(key);
}

pub fn seek_le(&mut self, key: FullKey<&[u8]>) {
let full_key_encoded = key.encode();
self.seek_restart_point_by_key(&full_key_encoded);
self.next_until_key(&full_key_encoded);
self.seek_restart_point_by_key(key);

self.next_until_key(key);
if !self.is_valid() {
self.seek_to_last();
}
self.prev_until_key(&full_key_encoded);
self.prev_until_key(key);
}
}

Expand Down Expand Up @@ -171,19 +171,15 @@ impl BlockIterator {
}

/// Moves forward until reaching the first that equals or larger than the given `key`.
fn next_until_key(&mut self, key: &[u8]) {
while self.is_valid()
&& KeyComparator::compare_encoded_full_key(&self.key[..], key) == Ordering::Less
{
fn next_until_key(&mut self, key: FullKey<&[u8]>) {
while self.is_valid() && self.key().cmp(&key) == Ordering::Less {
self.next_inner();
}
}

/// Moves backward until reaching the first key that equals or smaller than the given `key`.
fn prev_until_key(&mut self, key: &[u8]) {
while self.is_valid()
&& KeyComparator::compare_encoded_full_key(&self.key[..], key) == Ordering::Greater
{
fn prev_until_key(&mut self, key: FullKey<&[u8]>) {
while self.is_valid() && self.key().cmp(&key) == Ordering::Greater {
self.prev_inner();
}
}
Expand Down Expand Up @@ -240,7 +236,7 @@ impl BlockIterator {
}

/// Searches the restart point index that the given `key` belongs to.
fn search_restart_point_index_by_key(&self, key: &[u8]) -> usize {
fn search_restart_point_index_by_key(&self, key: FullKey<&[u8]>) -> usize {
// Find the largest restart point that restart key equals or less than the given key.
self.block
.search_restart_partition_point(
Expand All @@ -252,7 +248,9 @@ impl BlockIterator {
let prefix =
self.decode_prefix_at(probe as usize, key_len_type, value_len_type);
let probe_key = &self.block.data()[prefix.diff_key_range()];
match KeyComparator::compare_encoded_full_key(probe_key, key) {
let full_probe_key =
FullKey::from_slice_without_table_id(self.block.table_id(), probe_key);
match full_probe_key.cmp(&key) {
Ordering::Less | Ordering::Equal => true,
Ordering::Greater => false,
}
Expand All @@ -262,7 +260,7 @@ impl BlockIterator {
}

/// Seeks to the restart point that the given `key` belongs to.
fn seek_restart_point_by_key(&mut self, key: &[u8]) {
fn seek_restart_point_by_key(&mut self, key: FullKey<&[u8]>) {
let index = self.search_restart_point_index_by_key(key);
self.seek_restart_point_by_index(index)
}
Expand Down

0 comments on commit 64a5f88

Please sign in to comment.