Skip to content

Commit

Permalink
perf(storage): Merge multiple imms in the staging version to a large …
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Apr 10, 2023
1 parent fcc069a commit 490161d
Show file tree
Hide file tree
Showing 16 changed files with 1,482 additions and 175 deletions.
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ pub struct StorageConfig {
#[serde(default)]
pub shared_buffer_capacity_mb: Option<usize>,

/// The threshold for the number of immutable memtables to merge to a new imm.
#[serde(default = "default::storage::imm_merge_threshold")]
pub imm_merge_threshold: usize,

/// Whether to enable write conflict detection
#[serde(default = "default::storage::write_conflict_detection_enabled")]
pub write_conflict_detection_enabled: bool,
Expand Down Expand Up @@ -600,6 +604,10 @@ mod default {
1024
}

pub fn imm_merge_threshold() -> usize {
4
}

pub fn write_conflict_detection_enabled() -> bool {
cfg!(debug_assertions)
}
Expand Down
24 changes: 11 additions & 13 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_storage::hummock::iterator::test_utils::{
iterator_test_table_key_of, iterator_test_user_key_of,
};
use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use risingwave_storage::hummock::store::memtable::ImmutableMemtable;
use risingwave_storage::hummock::store::version::{
read_filter_for_batch, read_filter_for_local, HummockReadVersion, StagingData,
StagingSstableInfo, VersionUpdate,
Expand Down Expand Up @@ -60,6 +59,7 @@ async fn test_read_version_basic() {
vec![],
TableId::from(table_id),
None,
None,
);

read_version.update(VersionUpdate::Staging(StagingData::ImmMem(imm)));
Expand All @@ -75,13 +75,11 @@ async fn test_read_version_basic() {
.staging()
.prune_overlap(0, epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter
.cloned()
.collect::<Vec<ImmutableMemtable>>();
let staging_imm = staging_imm_iter.cloned().collect_vec();

assert_eq!(1, staging_imm.len());
assert_eq!(0, staging_sst_iter.count());
assert!(staging_imm.iter().any(|imm| imm.epoch() <= epoch));
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
}

{
Expand All @@ -99,6 +97,7 @@ async fn test_read_version_basic() {
vec![],
TableId::from(table_id),
None,
None,
);

read_version.update(VersionUpdate::Staging(StagingData::ImmMem(imm)));
Expand All @@ -116,13 +115,11 @@ async fn test_read_version_basic() {
.staging()
.prune_overlap(0, epoch, TableId::default(), &key_range);

let staging_imm = staging_imm_iter
.cloned()
.collect::<Vec<ImmutableMemtable>>();
let staging_imm = staging_imm_iter.cloned().collect_vec();

assert_eq!(1, staging_imm.len() as u64);
assert_eq!(0, staging_sst_iter.count());
assert!(staging_imm.iter().any(|imm| imm.epoch() <= epoch));
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));
}
}

Expand All @@ -143,7 +140,7 @@ async fn test_read_version_basic() {
.imm
.iter()
.rev()
.map(|imm| imm.epoch())
.map(|imm| imm.min_epoch())
.take(3)
.rev()
.collect::<Vec<_>>();
Expand Down Expand Up @@ -229,7 +226,7 @@ async fn test_read_version_basic() {

let staging_imm = staging_imm_iter.cloned().collect_vec();
assert_eq!(1, staging_imm.len());
assert_eq!(4, staging_imm[0].epoch());
assert_eq!(4, staging_imm[0].min_epoch());

let staging_ssts = staging_sst_iter.cloned().collect_vec();
assert_eq!(2, staging_ssts.len());
Expand All @@ -253,7 +250,7 @@ async fn test_read_version_basic() {

let staging_imm = staging_imm_iter.cloned().collect_vec();
assert_eq!(1, staging_imm.len());
assert_eq!(4, staging_imm[0].epoch());
assert_eq!(4, staging_imm[0].min_epoch());

let staging_ssts = staging_sst_iter.cloned().collect_vec();
assert_eq!(1, staging_ssts.len());
Expand Down Expand Up @@ -285,6 +282,7 @@ async fn test_read_filter_basic() {
vec![],
TableId::from(table_id),
None,
None,
);

read_version
Expand All @@ -311,7 +309,7 @@ async fn test_read_filter_basic() {

assert_eq!(1, staging_imm.len());
assert_eq!(0, staging_sst.len());
assert!(staging_imm.iter().any(|imm| imm.epoch() <= epoch));
assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch));

// build for local
{
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use risingwave_pb::hummock::{
CompactTask, CompactTaskProgress, CompactorWorkload, SubscribeCompactTasksResponse,
};
use risingwave_rpc_client::HummockMetaClient;
pub use shared_buffer_compact::compact;
pub use shared_buffer_compact::{compact, merge_imms_in_memory};
use sysinfo::{CpuRefreshKind, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt};
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::task::JoinHandle;
Expand Down
85 changes: 84 additions & 1 deletion src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@ use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
use crate::hummock::compactor::context::CompactorContext;
use crate::hummock::compactor::{CompactOutput, Compactor};
use crate::hummock::event_handler::uploader::UploadTaskPayload;
use crate::hummock::event_handler::LocalInstanceId;
use crate::hummock::iterator::{Forward, HummockIterator, OrderedMergeIteratorInner};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner, SharedBufferVersionedEntry,
};
use crate::hummock::sstable::DeleteRangeAggregatorBuilder;
use crate::hummock::store::memtable::ImmutableMemtable;
use crate::hummock::utils::MemoryTracker;
use crate::hummock::value::HummockValue;
use crate::hummock::{
CachePolicy, HummockError, HummockResult, RangeTombstonesCollector, SstableBuilderOptions,
};
Expand Down Expand Up @@ -104,7 +111,7 @@ async fn compact_shared_buffer(
let tombstones = imm.get_delete_range_tombstones();
builder.add_tombstone(tombstones);
// calculate encoded bytes of key var length
(imm.get_payload().len() * 8 + imm.size()) as u64
(imm.kv_count() * 8 + imm.size()) as u64
};
compact_data_size += data_size;
size_and_start_user_keys.push((data_size, imm.start_user_key()));
Expand Down Expand Up @@ -238,6 +245,82 @@ async fn compact_shared_buffer(
}
}

/// Merge multiple batches into a larger one
pub async fn merge_imms_in_memory(
table_id: TableId,
instance_id: LocalInstanceId,
imms: Vec<ImmutableMemtable>,
memory_tracker: Option<MemoryTracker>,
) -> HummockResult<ImmutableMemtable> {
let mut range_tombstone_list = Vec::new();
let mut kv_count = 0;
let mut epochs = vec![];
let mut merged_size = 0;
let mut merged_imm_ids = Vec::with_capacity(imms.len());

let mut imm_iters = Vec::with_capacity(imms.len());
for imm in imms {
assert!(imm.kv_count() > 0, "imm should not be empty");
assert_eq!(
table_id,
imm.table_id(),
"should only merge data belonging to the same table"
);

merged_imm_ids.push(imm.batch_id());
epochs.push(imm.min_epoch());
kv_count += imm.kv_count();
merged_size += imm.size();
range_tombstone_list.extend(imm.get_delete_range_tombstones());
imm_iters.push(imm.into_forward_iter());
}
range_tombstone_list.sort();
epochs.sort();

// use merge iterator to merge input imms
let mut mi = OrderedMergeIteratorInner::new(imm_iters);
mi.rewind().await?;
let mut items = Vec::with_capacity(kv_count);
while mi.is_valid() {
let (key, (epoch, value)) = mi.current_item();
items.push(((key, value), epoch));
mi.next().await?;
}

let mut merged_payload: Vec<SharedBufferVersionedEntry> = Vec::new();
let mut pivot = items.first().map(|((k, _), _)| k.clone()).unwrap();
let mut versions: Vec<(HummockEpoch, HummockValue<Bytes>)> = Vec::new();

for ((key, value), epoch) in items {
assert!(key >= pivot, "key should be in ascending order");
if key == pivot {
versions.push((epoch, value));
} else {
merged_payload.push((pivot, versions));
pivot = key;
versions = vec![(epoch, value)];
}
}
// process the last key
if !versions.is_empty() {
merged_payload.push((pivot, versions));
}

Ok(SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
epochs,
merged_payload,
kv_count,
merged_imm_ids,
range_tombstone_list,
merged_size,
memory_tracker,
)),
table_id,
instance_id,
})
}

pub struct SharedBufferCompactRunner {
compactor: Compactor,
split_index: usize,
Expand Down
36 changes: 34 additions & 2 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_hummock_sdk::{info_in_release, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::version_update_payload::Payload;
use tokio::spawn;
use tokio::sync::{mpsc, oneshot};
use tracing::{error, info};
use tracing::{error, info, warn};

use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
use crate::hummock::compactor::{compact, CompactorContext};
Expand Down Expand Up @@ -145,12 +145,19 @@ impl HummockEventHandler {
let write_conflict_detector =
ConflictDetector::new_from_config(&compactor_context.storage_opts);
let sstable_object_id_manager = compactor_context.sstable_object_id_manager.clone();
let upload_compactor_context = compactor_context.clone();
let uploader = HummockUploader::new(
pinned_version.clone(),
Arc::new(move |payload, task_info| {
spawn(flush_imms(payload, task_info, compactor_context.clone()))
spawn(flush_imms(
payload,
task_info,
upload_compactor_context.clone(),
))
}),
buffer_tracker,
&compactor_context.storage_opts,
compactor_context.compaction_executor.clone(),
);

Self {
Expand Down Expand Up @@ -266,6 +273,7 @@ impl HummockEventHandler {
fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) {
// todo: do some prune for version update
Self::for_each_read_version(&self.read_version_mapping, |read_version| {
info!("data_spilled. SST size {}", staging_sstable_info.imm_size());
read_version.update(VersionUpdate::Staging(StagingData::Sst(
staging_sstable_info.clone(),
)))
Expand Down Expand Up @@ -441,6 +449,26 @@ impl HummockEventHandler {
UploaderEvent::DataSpilled(staging_sstable_info) => {
self.handle_data_spilled(staging_sstable_info);
}

UploaderEvent::ImmMerged(merge_output) => {
// update read version for corresponding table shards
let read_guard = self.read_version_mapping.read();
read_guard.get(&merge_output.table_id).map_or((), |shards| {
shards.get(&merge_output.instance_id).map_or_else(
|| {
warn!(
"handle ImmMerged: table instance not found. table {}, instance {}",
&merge_output.table_id, &merge_output.instance_id
)
},
|read_version| {
read_version.write().update(VersionUpdate::Staging(
StagingData::MergedImmMem(merge_output.merged_imm),
));
},
)
});
}
},
Either::Right(event) => {
match event {
Expand Down Expand Up @@ -475,8 +503,12 @@ impl HummockEventHandler {
is_checkpoint,
} => {
self.uploader.seal_epoch(epoch);

if is_checkpoint {
self.uploader.start_sync_epoch(epoch);
} else {
// start merging task on non-checkpoint epochs sealed
self.uploader.start_merge_imms(epoch);
}
}
#[cfg(any(test, feature = "test"))]
Expand Down
4 changes: 3 additions & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ impl HummockEvent {
format!("VersionUpdate {:?}", version_update_payload)
}

HummockEvent::ImmToUploader(imm) => format!("ImmToUploader {:?}", imm),
HummockEvent::ImmToUploader(imm) => {
format!("ImmToUploader {:?}", imm)
}

HummockEvent::SealEpoch {
epoch,
Expand Down
Loading

0 comments on commit 490161d

Please sign in to comment.