diff --git a/proto/hummock.proto b/proto/hummock.proto index 271663808d05..9b0d8e9df70c 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -71,6 +71,13 @@ message GroupMetaChange { repeated uint32 table_ids_remove = 2; } +message GroupTableChange { + repeated uint32 table_ids = 1; + uint64 target_group_id = 2; + uint64 origin_group_id = 3; + uint64 new_sst_start_id = 4; +} + message GroupDestroy {} message GroupDelta { @@ -79,6 +86,7 @@ message GroupDelta { GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; GroupMetaChange group_meta_change = 4; + GroupTableChange group_table_change = 5; } } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 92e11420043a..19129fd789b9 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -26,10 +26,11 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::CompactionGroupId; use risingwave_pb::hummock::group_delta::DeltaType; +use risingwave_pb::hummock::hummock_version_delta::GroupDeltas; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::hummock::{ - CompactionConfig, CompactionGroupInfo, GroupConstruct, GroupDelta, GroupDestroy, - GroupMetaChange, + compact_task, CompactionConfig, CompactionGroupInfo, GroupConstruct, GroupDelta, GroupDestroy, + GroupMetaChange, GroupTableChange, }; use tokio::sync::{OnceCell, RwLock}; @@ -61,7 +62,7 @@ impl HummockManager { ) -> Result> { let compaction_group_manager = RwLock::new(CompactionGroupManager { compaction_groups: BTreeMap::new(), - provided_default_config_for_test: config, + default_config: config, }); compaction_group_manager .write() @@ -421,11 +422,24 @@ impl HummockManager { /// Splits a compaction group into two. The new one will contain `table_ids`. /// Returns the newly created compaction group id. - #[named] pub async fn split_compaction_group( &self, parent_group_id: CompactionGroupId, table_ids: &[StateTableId], + ) -> Result { + self.move_state_table_to_compaction_group(parent_group_id, table_ids, None, false) + .await + } + + /// move some table to another compaction-group. Create a new compaction group if it does not + /// exist. + #[named] + pub async fn move_state_table_to_compaction_group( + &self, + parent_group_id: CompactionGroupId, + table_ids: &[StateTableId], + target_group_id: Option, + allow_split_by_table: bool, ) -> Result { if table_ids.is_empty() { return Ok(parent_group_id); @@ -453,120 +467,176 @@ impl HummockManager { parent_group_id ))); } + if let Some(compaction_group_id) = target_group_id { + if !versioning.check_branched_sst_in_target_group( + &table_ids, + &parent_group_id, + &compaction_group_id, + ) { + return Err(Error::CompactionGroup(format!( + "invalid split attempt for group {}: we shall wait some time for parent group and target group could compact stale sst files", + parent_group_id + ))); + } + } let mut new_version_delta = BTreeMapEntryTransaction::new_insert( &mut versioning.hummock_version_deltas, current_version.id + 1, build_version_delta_after_version(current_version), ); - - // Remove tables from parent group. - for table_id in &table_ids { - let group_deltas = &mut new_version_delta - .group_deltas - .entry(parent_group_id) - .or_default() - .group_deltas; - group_deltas.push(GroupDelta { - delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { - table_ids_remove: vec![*table_id], - ..Default::default() - })), - }); - } - - // Add tables to new group. - let new_group_id = self - .env - .id_gen_manager() - .generate::<{ IdCategory::CompactionGroup }>() - .await?; let new_sst_start_id = self .env .id_gen_manager() .generate_interval::<{ IdCategory::HummockSstableId }>( - versioning.current_version.count_new_ssts_in_group_split( + current_version.count_new_ssts_in_group_split( parent_group_id, - &HashSet::from_iter(table_ids.iter().cloned()), + HashSet::from_iter(table_ids.clone()), ), ) .await?; - let group_deltas = &mut new_version_delta - .group_deltas - .entry(new_group_id) - .or_default() - .group_deltas; - let config = self - .compaction_group_manager - .read() - .await - .get_compaction_group_config(new_group_id) - .compaction_config - .as_ref() - .clone(); - group_deltas.push(GroupDelta { - delta_type: Some(DeltaType::GroupConstruct(GroupConstruct { - group_config: Some(config), - group_id: new_group_id, - parent_group_id, - table_ids, - new_sst_start_id, - })), - }); + let mut new_group = None; + let target_compaction_group_id = match target_group_id { + Some(compaction_group_id) => { + match current_version.levels.get(&compaction_group_id) { + Some(group) => { + for table_id in &table_ids { + if group.member_table_ids.contains(table_id) { + return Err(Error::CompactionGroup(format!( + "table {} already exist in group {}", + *table_id, compaction_group_id, + ))); + } + } + } + None => { + return Err(Error::CompactionGroup(format!( + "target group {} does not exist", + compaction_group_id, + ))); + } + } + let group_deltas = &mut new_version_delta + .group_deltas + .entry(compaction_group_id) + .or_default() + .group_deltas; + group_deltas.push(GroupDelta { + delta_type: Some(DeltaType::GroupTableChange(GroupTableChange { + table_ids: table_ids.to_vec(), + origin_group_id: parent_group_id, + target_group_id: compaction_group_id, + new_sst_start_id, + })), + }); + compaction_group_id + } + None => { + // All NewCompactionGroup pairs are mapped to one new compaction group. + let new_compaction_group_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::CompactionGroup }>() + .await?; + let mut config = self + .compaction_group_manager + .read() + .await + .get_default_compaction_group_config(); + config.split_by_state_table = allow_split_by_table; + + new_version_delta.group_deltas.insert( + new_compaction_group_id, + GroupDeltas { + group_deltas: vec![GroupDelta { + delta_type: Some(DeltaType::GroupConstruct(GroupConstruct { + group_config: Some(config.clone()), + group_id: new_compaction_group_id, + parent_group_id, + new_sst_start_id, + table_ids: table_ids.to_vec(), + })), + }], + }, + ); + new_group = Some((new_compaction_group_id, config)); + new_version_delta.group_deltas.insert( + parent_group_id, + GroupDeltas { + group_deltas: vec![GroupDelta { + delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { + table_ids_remove: table_ids.to_vec(), + ..Default::default() + })), + }], + }, + ); + new_compaction_group_id + } + }; let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts); let mut trx = Transaction::default(); new_version_delta.apply_to_txn(&mut trx)?; - self.env.meta_store().txn(trx).await?; + if let Some((new_compaction_group_id, config)) = new_group { + let mut compaction_group_manager = self.compaction_group_manager.write().await; + let insert = BTreeMapEntryTransaction::new_insert( + &mut compaction_group_manager.compaction_groups, + new_compaction_group_id, + CompactionGroup { + group_id: new_compaction_group_id, + compaction_config: Arc::new(config), + }, + ); + insert.apply_to_txn(&mut trx)?; + self.env.meta_store().txn(trx).await?; + insert.commit(); + } else { + self.env.meta_store().txn(trx).await?; + } let sst_split_info = versioning .current_version .apply_version_delta(&new_version_delta); // Updates SST split info - for (object_id, sst_id, parent_old_sst_id, parent_new_sst_id) in sst_split_info { + for (object_id, sst_id, _parent_old_sst_id, parent_new_sst_id) in sst_split_info { match branched_ssts.get_mut(object_id) { Some(mut entry) => { - let p = entry.get_mut(&parent_group_id).unwrap(); - let parent_pos = p.iter().position(|id| *id == parent_old_sst_id).unwrap(); if let Some(parent_new_sst_id) = parent_new_sst_id { - p[parent_pos] = parent_new_sst_id; + entry.insert(parent_group_id, parent_new_sst_id); } else { - p.remove(parent_pos); - if p.is_empty() { - entry.remove(&parent_group_id); - } + entry.remove(&parent_group_id); } - entry.entry(new_group_id).or_default().push(sst_id); + entry.insert(target_compaction_group_id, sst_id); } None => { - branched_ssts.insert( - object_id, - if let Some(parent_new_sst_id) = parent_new_sst_id { - [ - (parent_group_id, vec![parent_new_sst_id]), - (new_group_id, vec![sst_id]), - ] - .into_iter() - .collect() - } else { - [(new_group_id, vec![sst_id])].into_iter().collect() - }, - ); + let mut groups = HashMap::from_iter([(target_compaction_group_id, sst_id)]); + if let Some(parent_new_sst_id) = parent_new_sst_id { + groups.insert(parent_group_id, parent_new_sst_id); + } + branched_ssts.insert(object_id, groups); } } } new_version_delta.commit(); branched_ssts.commit_memory(); self.notify_last_version_delta(versioning); - - Ok(new_group_id) + // Don't trigger compactions if we enable deterministic compaction + if !self.env.opts.compaction_deterministic_test { + // commit_epoch may contains SSTs from any compaction group + self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim); + self.try_send_compaction_request( + target_compaction_group_id, + compact_task::TaskType::SpaceReclaim, + ); + } + Ok(target_compaction_group_id) } } #[derive(Default)] pub(super) struct CompactionGroupManager { compaction_groups: BTreeMap, - /// Provided default config, only used in test. - provided_default_config_for_test: CompactionConfig, + default_config: CompactionConfig, } impl CompactionGroupManager { @@ -602,14 +672,20 @@ impl CompactionGroupManager { compaction_group_ids .iter() .map(|id| { - let group = self.compaction_groups.get(id).cloned().unwrap_or_else(|| { - CompactionGroup::new(*id, self.provided_default_config_for_test.clone()) - }); + let group = self + .compaction_groups + .get(id) + .cloned() + .unwrap_or_else(|| CompactionGroup::new(*id, self.default_config.clone())); (*id, group) }) .collect() } + fn get_default_compaction_group_config(&self) -> CompactionConfig { + self.default_config.clone() + } + async fn update_compaction_config( &mut self, compaction_group_ids: &[CompactionGroupId], @@ -621,10 +697,7 @@ impl CompactionGroupManager { if !compaction_groups.contains_key(compaction_group_id) { compaction_groups.insert( *compaction_group_id, - CompactionGroup::new( - *compaction_group_id, - self.provided_default_config_for_test.clone(), - ), + CompactionGroup::new(*compaction_group_id, self.default_config.clone()), ); } let group = compaction_groups.get(compaction_group_id).unwrap(); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 31edd26dc883..d990c25f2d1e 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -29,7 +29,7 @@ use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ add_new_sub_level, build_initial_compaction_group_levels, build_version_delta_after_version, - get_compaction_group_ids, get_member_table_ids, try_get_compaction_group_id_by_table_id, + get_compaction_group_ids, try_get_compaction_group_id_by_table_id, BranchedSstInfo, HummockVersionExt, HummockVersionUpdateExt, }; use risingwave_hummock_sdk::{ @@ -787,6 +787,7 @@ where .values() .map(|v| v.minimal_pinned_snapshot) .fold(max_committed_epoch, std::cmp::min); + (versioning_guard.current_version.clone(), watermark) }; if current_version.levels.get(&compaction_group_id).is_none() { @@ -837,26 +838,13 @@ where start_time.elapsed() ); } else { - let all_table_ids = get_member_table_ids(¤t_version); // to get all relational table_id from sst_info - let table_ids = compact_task - .input_ssts - .iter() - .flat_map(|level| { - level - .table_infos - .iter() - .flat_map(|sst_info| sst_info.table_ids.iter().cloned()) - .collect_vec() - }) - .collect::>(); - for table_id in table_ids { - // to found exist table_id from - if all_table_ids.contains(&table_id) { - compact_task.existing_table_ids.push(table_id); - } - } - + compact_task.existing_table_ids = current_version + .levels + .get(&compaction_group_id) + .unwrap() + .member_table_ids + .clone(); compact_task.table_options = table_id_to_option .into_iter() .filter_map(|(table_id, table_option)| { @@ -1045,19 +1033,14 @@ where fn is_compact_task_expired( compact_task: &CompactTask, - branched_ssts: &BTreeMap< - HummockSstableObjectId, - BTreeMap>, - >, + branched_ssts: &BTreeMap, ) -> bool { for input_level in compact_task.get_input_ssts() { for table_info in input_level.get_table_infos() { if let Some(mp) = branched_ssts.get(&table_info.object_id) { if mp .get(&compact_task.compaction_group_id) - .map_or(true, |sst_id_vec| { - !sst_id_vec.iter().contains(&table_info.sst_id) - }) + .map_or(true, |sst_id| *sst_id != table_info.sst_id) { return true; } @@ -1159,9 +1142,11 @@ where } } let is_success = if let TaskStatus::Success = compact_task.task_status() { - let is_expired = !current_version - .get_levels() - .contains_key(&compact_task.compaction_group_id) + let is_expired = current_version + .levels + .get(&compact_task.compaction_group_id) + .map(|group| group.member_table_ids != compact_task.existing_table_ids) + .unwrap_or(true) || Self::is_compact_task_expired(compact_task, &versioning.branched_ssts); if is_expired { compact_task.set_task_status(TaskStatus::InvalidGroupCanceled); @@ -1376,7 +1361,7 @@ where None => false, }; if !is_sst_belong_to_group_declared { - let mut group_table_ids: BTreeMap<_, Vec<_>> = BTreeMap::new(); + let mut group_table_ids: BTreeMap<_, Vec> = BTreeMap::new(); for table_id in sst.get_table_ids() { match try_get_compaction_group_id_by_table_id( &versioning.current_version, @@ -1418,15 +1403,14 @@ where let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts); let mut branch_sstables = Vec::with_capacity(new_sst_id_number); for (sst, group_table_ids) in incorrect_ssts { - let mut branch_groups = BTreeMap::new(); - for (group_id, match_ids) in group_table_ids { + let mut branch_groups = HashMap::new(); + for (group_id, _match_ids) in group_table_ids { let mut branch_sst = sst.clone(); branch_sst.sst_id = new_sst_id; - branch_sst.table_ids = match_ids; branch_sstables.push(ExtendedSstableInfo::with_compaction_group( group_id, branch_sst, )); - branch_groups.insert(group_id, vec![new_sst_id]); + branch_groups.insert(group_id, new_sst_id); new_sst_id += 1; } if !branch_groups.is_empty() { @@ -1640,24 +1624,31 @@ where let checkpoint_version_copy = versioning_guard.checkpoint_version.clone(); let hummock_version_deltas_copy = versioning_guard.hummock_version_deltas.clone(); let version_stats_copy = versioning_guard.version_stats.clone(); + let branched_ssts = versioning_guard.branched_ssts.clone(); ( - compact_statuses_copy, - compact_task_assignment_copy, - pinned_versions_copy, - pinned_snapshots_copy, - checkpoint_version_copy, - hummock_version_deltas_copy, - version_stats_copy, + ( + compact_statuses_copy, + compact_task_assignment_copy, + pinned_versions_copy, + pinned_snapshots_copy, + checkpoint_version_copy, + hummock_version_deltas_copy, + version_stats_copy, + ), + branched_ssts, ) }; - let mem_state = get_state(compaction_guard.borrow(), versioning_guard.borrow()); + let (mem_state, branched_ssts) = + get_state(compaction_guard.borrow(), versioning_guard.borrow()); self.load_meta_store_state_impl( compaction_guard.borrow_mut(), versioning_guard.borrow_mut(), ) .await .expect("Failed to load state from meta store"); - let loaded_state = get_state(compaction_guard.borrow(), versioning_guard.borrow()); + let (loaded_state, load_branched_ssts) = + get_state(compaction_guard.borrow(), versioning_guard.borrow()); + assert_eq!(branched_ssts, load_branched_ssts); assert_eq!( mem_state, loaded_state, "hummock in-mem state is inconsistent with meta store state", @@ -1671,6 +1662,13 @@ where read_lock!(self, versioning).await.current_version.clone() } + /// Gets branched sstable infos + /// Should not be called inside [`HummockManager`], because it requests locks internally. + #[named] + pub async fn get_branched_ssts_info(&self) -> BTreeMap { + read_lock!(self, versioning).await.branched_ssts.clone() + } + /// Get version deltas from meta store #[cfg_attr(coverage, no_coverage)] pub async fn list_version_deltas( @@ -2099,27 +2097,20 @@ where } fn drop_sst( - branched_ssts: &mut BTreeMapTransaction< - '_, - HummockSstableObjectId, - BTreeMap>, - >, + branched_ssts: &mut BTreeMapTransaction<'_, HummockSstableObjectId, BranchedSstInfo>, group_id: CompactionGroupId, object_id: HummockSstableObjectId, sst_id: HummockSstableId, ) -> bool { match branched_ssts.get_mut(object_id) { Some(mut entry) => { - let sst_id_vec = entry.get_mut(&group_id).unwrap(); - sst_id_vec.remove(sst_id_vec.iter().position(|id| *id == sst_id).unwrap()); - if sst_id_vec.is_empty() { - entry.remove(&group_id); - if entry.is_empty() { - branched_ssts.remove(object_id); - true - } else { - false - } + // if group_id not exist, it would not pass the stale check before. + let removed_sst_id = entry.get(&group_id).unwrap(); + assert_eq!(*removed_sst_id, sst_id); + entry.remove(&group_id); + if entry.is_empty() { + branched_ssts.remove(object_id); + true } else { false } @@ -2130,11 +2121,7 @@ fn drop_sst( fn gen_version_delta<'a>( txn: &mut BTreeMapTransaction<'a, HummockVersionId, HummockVersionDelta>, - branched_ssts: &mut BTreeMapTransaction< - 'a, - HummockSstableObjectId, - BTreeMap>, - >, + branched_ssts: &mut BTreeMapTransaction<'a, HummockSstableObjectId, BranchedSstInfo>, old_version: &HummockVersion, compact_task: &CompactTask, trivial_move: bool, diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index cd5c77c24f4a..0181cf0b5be7 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -21,13 +21,13 @@ use itertools::Itertools; use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - get_compaction_group_ids, get_compaction_group_ssts, HummockVersionExt, + get_compaction_group_ids, get_compaction_group_ssts, BranchedSstInfo, HummockVersionExt, }; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::table_stats::{to_prost_table_stats_map, TableStats, TableStatsMap}; use risingwave_hummock_sdk::{ - CompactionGroupId, ExtendedSstableInfo, HummockContextId, HummockEpoch, HummockSstableId, - HummockSstableObjectId, HummockVersionId, LocalSstableInfo, FIRST_VERSION_ID, + CompactionGroupId, ExtendedSstableInfo, HummockContextId, HummockEpoch, HummockSstableObjectId, + HummockVersionId, LocalSstableInfo, FIRST_VERSION_ID, }; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; @@ -37,7 +37,9 @@ use risingwave_pb::hummock::{ KeyRange, SstableInfo, }; -use crate::hummock::compaction::{default_level_selector, ManualCompactionOption}; +use crate::hummock::compaction::{ + default_level_selector, LevelSelector, ManualCompactionOption, SpaceReclaimCompactionSelector, +}; use crate::hummock::error::Error; use crate::hummock::test_utils::*; use crate::hummock::{ @@ -58,6 +60,34 @@ fn pin_snapshots_epoch(pin_snapshots: &[HummockPinnedSnapshot]) -> Vec { .collect_vec() } +fn gen_sstable_info(sst_id: u64, idx: usize, table_ids: Vec) -> SstableInfo { + SstableInfo { + sst_id, + key_range: Some(KeyRange { + left: iterator_test_key_of_epoch(1, idx, 1), + right: iterator_test_key_of_epoch(1, idx, 1), + right_exclusive: false, + }), + table_ids, + object_id: sst_id, + min_epoch: 20, + max_epoch: 20, + ..Default::default() + } +} + +fn gen_extend_sstable_info( + sst_id: u64, + group_id: u64, + idx: usize, + table_ids: Vec, +) -> ExtendedSstableInfo { + ExtendedSstableInfo { + compaction_group_id: group_id, + sst_info: gen_sstable_info(sst_id, idx, table_ids), + table_stats: Default::default(), + } +} fn get_compaction_group_object_ids( version: &HummockVersion, group_id: CompactionGroupId, @@ -1387,7 +1417,7 @@ async fn test_split_compaction_group_on_commit() { async fn get_branched_ssts( hummock_manager: &HummockManager, -) -> BTreeMap>> { +) -> BTreeMap { hummock_manager .versioning .read(&["", "", ""]) @@ -1528,7 +1558,7 @@ async fn test_split_compaction_group_on_demand_basic() { .get(&new_group_id) .cloned() .unwrap(), - vec![object_id], + object_id, "trivial adjust should also generate a new SST id" ); } @@ -1597,7 +1627,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() { assert_eq!(branched_ssts.len(), 1); assert_eq!(branched_ssts.get(&10).unwrap().len(), 2); let sst_ids = branched_ssts.get(&10).unwrap().get(&2).cloned().unwrap(); - assert_ne!(sst_ids, vec![10]); + assert_ne!(sst_ids, 10); assert_ne!( branched_ssts .get(&10) @@ -1607,15 +1637,6 @@ async fn test_split_compaction_group_on_demand_non_trivial() { .unwrap(), sst_ids, ); - assert_ne!( - branched_ssts - .get(&10) - .unwrap() - .get(&new_group_id) - .cloned() - .unwrap(), - vec![10], - ); } async fn get_manual_compact_task( @@ -1685,8 +1706,8 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { assert_eq!(compaction_task.target_level, base_level as u32); compaction_task.sorted_output_ssts = vec![ SstableInfo { - object_id: sst_1.sst_info.get_object_id() + 1, - sst_id: sst_1.sst_info.get_object_id() + 1, + object_id: 11, + sst_id: 11, table_ids: vec![100, 101], key_range: Some(KeyRange { left: iterator_test_key_of_epoch(1, 1, 1), @@ -1696,8 +1717,8 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ..Default::default() }, SstableInfo { - object_id: sst_1.sst_info.get_object_id() + 2, - sst_id: sst_1.sst_info.get_object_id() + 2, + object_id: 12, + sst_id: 12, table_ids: vec![100], key_range: Some(KeyRange { left: iterator_test_key_of_epoch(1, 2, 2), @@ -1739,10 +1760,24 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { .len(), 1 ); + + let branched_ssts = hummock_manager.get_branched_ssts_info().await; + // object-11 and object-12 + assert_eq!(branched_ssts.len(), 2); + let info = branched_ssts.get(&11).unwrap(); + assert_eq!( + info.keys().sorted().cloned().collect_vec(), + vec![2, new_group_id] + ); + assert_eq!( + current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0] + .object_id, + sst_1.sst_info.get_object_id() + 1, + ); assert_eq!( current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0] .table_ids, - vec![101] + vec![100, 101] ); assert_eq!( current_version @@ -1758,7 +1793,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { .levels[base_level - 1] .table_infos[0] .table_ids, - vec![100] + vec![100, 101] ); assert_eq!( current_version @@ -1858,3 +1893,135 @@ async fn test_compaction_task_expiration_due_to_split_group() { "version should change because compaction task has succeeded" ); } + +#[tokio::test] +async fn test_move_tables_between_compaction_group() { + let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; + let context_id = worker_node.id; + + hummock_manager + .register_table_ids(&[(100, 2)]) + .await + .unwrap(); + hummock_manager + .register_table_ids(&[(101, 2)]) + .await + .unwrap(); + hummock_manager + .register_table_ids(&[(102, 2)]) + .await + .unwrap(); + let sst_1 = gen_extend_sstable_info(10, 2, 1, vec![100, 101, 102]); + hummock_manager + .commit_epoch(30, vec![sst_1.clone()], HashMap::from([(10, context_id)])) + .await + .unwrap(); + // Construct data via manual compaction + let mut compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let base_level: usize = 6; + assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); + assert_eq!(compaction_task.target_level, base_level as u32); + compaction_task.sorted_output_ssts = vec![ + gen_sstable_info(11, 1, vec![100]), + gen_sstable_info(12, 2, vec![100, 101]), + gen_sstable_info(13, 3, vec![101, 102]), + ]; + compaction_task.task_status = TaskStatus::Success.into(); + assert!(hummock_manager + .report_compact_task(context_id, &mut compaction_task, None) + .await + .unwrap()); + let sst_2 = gen_extend_sstable_info(14, 2, 1, vec![101, 102]); + hummock_manager + .commit_epoch(31, vec![sst_2.clone()], HashMap::from([(14, context_id)])) + .await + .unwrap(); + let current_version = hummock_manager.get_current_version().await; + assert_eq!( + current_version.get_compaction_group_levels(2).levels[base_level - 1] + .table_infos + .len(), + 3 + ); + + hummock_manager + .split_compaction_group(2, &[100]) + .await + .unwrap(); + let current_version = hummock_manager.get_current_version().await; + let new_group_id = current_version.levels.keys().max().cloned().unwrap(); + assert_eq!( + current_version.get_compaction_group_levels(2).levels[base_level - 1] + .table_infos + .len(), + 2 + ); + + let level = ¤t_version + .get_compaction_group_levels(new_group_id) + .levels[base_level - 1]; + assert_eq!(level.table_infos[0].table_ids, vec![100]); + assert_eq!(level.table_infos[1].table_ids, vec![100, 101]); + assert_eq!(level.table_infos.len(), 2); + let branched_ssts = hummock_manager.get_branched_ssts_info().await; + // object-id 11 and 12. + assert_eq!(branched_ssts.len(), 2); + let info = branched_ssts.get(&12).unwrap(); + let groups = info.keys().sorted().cloned().collect_vec(); + assert_eq!(groups, vec![2, new_group_id]); + let ret = hummock_manager + .move_state_table_to_compaction_group(2, &[101], Some(new_group_id), false) + .await; + // we can not move table-101 since sst-12 has been moved to new-group. If we move sst-12 to + // new-group, some of its data may be expired and it would return error result. + assert!(ret.is_err()); + let mut selector: Box = Box::::default(); + let mut compaction_task = hummock_manager + .get_compact_task(2, &mut selector) + .await + .unwrap() + .unwrap(); + assert_eq!(compaction_task.existing_table_ids, vec![101, 102]); + assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); + assert_eq!(compaction_task.input_ssts[0].table_infos[0].object_id, 12); + compaction_task.sorted_output_ssts = vec![gen_sstable_info(20, 2, vec![101])]; + compaction_task.task_status = TaskStatus::Success.into(); + hummock_manager + .assign_compaction_task(&compaction_task, context_id) + .await + .unwrap(); + let ret = hummock_manager + .report_compact_task(context_id, &mut compaction_task, None) + .await + .unwrap(); + assert!(ret); + let branched_ssts = hummock_manager.get_branched_ssts_info().await; + // there is still left one sst for object-12 in branched-sst. + assert_eq!(branched_ssts.len(), 2); + hummock_manager + .move_state_table_to_compaction_group(2, &[101], Some(new_group_id), false) + .await + .unwrap(); + let current_version = hummock_manager.get_current_version().await; + assert_eq!( + current_version + .get_compaction_group_levels(new_group_id) + .levels[base_level - 1] + .table_infos + .len(), + 4 + ); + assert_eq!( + current_version.get_compaction_group_levels(2).levels[base_level - 1] + .table_infos + .len(), + 1 + ); + let branched_ssts = hummock_manager.get_branched_ssts_info().await; + assert_eq!(branched_ssts.len(), 5); + let info = branched_ssts.get(&14).unwrap(); + assert_eq!( + info.keys().cloned().sorted().collect_vec(), + vec![2, new_group_id] + ); +} diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 48c02cb01acc..2d089cfb4ef9 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -13,14 +13,17 @@ // limitations under the License. use std::cmp; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::ops::RangeBounds; use function_name::named; use itertools::Itertools; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ + get_compaction_group_ids, BranchedSstInfo, HummockVersionExt, +}; +use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockContextId, HummockSstableId, HummockSstableObjectId, HummockVersionId, + CompactionGroupId, HummockContextId, HummockSstableObjectId, HummockVersionId, }; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::write_limits::WriteLimit; @@ -83,7 +86,7 @@ pub struct Versioning { pub branched_ssts: BTreeMap< // SST object id HummockSstableObjectId, - BTreeMap>, + BranchedSstInfo, >, /// `version_safe_points` is similar to `pinned_versions` expect for being a transient state. /// Hummock versions GE than min(safe_point) should not be GCed. @@ -149,6 +152,46 @@ impl Versioning { // deleted. } } + + /// If there is some sst in the target group which is just split but we have not compact it, we + /// can not split or move state-table to those group, because it may cause data overlap. + pub fn check_branched_sst_in_target_group( + &self, + table_ids: &[StateTableId], + source_group_id: &CompactionGroupId, + target_group_id: &CompactionGroupId, + ) -> bool { + for groups in self.branched_ssts.values() { + if groups.contains_key(target_group_id) && groups.contains_key(source_group_id) { + return false; + } + } + let mut found_sstable_repeated = false; + let moving_table_ids: HashSet<&u32> = HashSet::from_iter(table_ids); + if let Some(group) = self.current_version.levels.get(target_group_id) { + let target_member_table_ids: HashSet = + HashSet::from_iter(group.member_table_ids.clone()); + self.current_version.level_iter(*source_group_id, |level| { + for sst in &level.table_infos { + if sst + .table_ids + .iter() + .all(|table_id| !moving_table_ids.contains(table_id)) + { + continue; + } + for table_id in &sst.table_ids { + if target_member_table_ids.contains(table_id) { + found_sstable_repeated = true; + return false; + } + } + } + true + }); + } + !found_sstable_repeated + } } impl HummockManager diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 5e3976eeb31f..8c6d025facf7 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -89,6 +89,13 @@ where .add_compactor(context_id, u64::MAX, 16); temp_compactor = true; } + let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await); + register_sstable_infos_to_compaction_group( + hummock_manager, + &test_tables_2, + StaticCompactionGroupId::StateDefault.into(), + ) + .await; let compactor = hummock_manager.get_idle_compactor().await.unwrap(); let mut selector = default_level_selector(); let mut compact_task = hummock_manager @@ -104,19 +111,13 @@ where if temp_compactor { assert_eq!(compactor.context_id(), context_id); } - let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await); - register_sstable_infos_to_compaction_group( - hummock_manager, - &test_tables_2, - StaticCompactionGroupId::StateDefault.into(), - ) - .await; compact_task.sorted_output_ssts = test_tables_2.clone(); compact_task.set_task_status(TaskStatus::Success); - hummock_manager + let ret = hummock_manager .report_compact_task(context_id, &mut compact_task, None) .await .unwrap(); + assert!(ret); if temp_compactor { hummock_manager .compactor_manager_ref_for_test() diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 89a8a16e8795..214f0c75620c 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -231,8 +231,18 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { c.bench_function("bench_merge_iterator", |b| { b.to_async(&runtime).iter(|| { let sub_iters = vec![ - ConcatSstableIterator::new(level1.clone(), KeyRange::inf(), sstable_store.clone()), - ConcatSstableIterator::new(level2.clone(), KeyRange::inf(), sstable_store.clone()), + ConcatSstableIterator::new( + vec![0], + level1.clone(), + KeyRange::inf(), + sstable_store.clone(), + ), + ConcatSstableIterator::new( + vec![0], + level2.clone(), + KeyRange::inf(), + sstable_store.clone(), + ), ]; let iter = UnorderedMergeIteratorInner::for_compactor(sub_iters); let sstable_store1 = sstable_store.clone(); diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 9426925d3469..46f06521e80f 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -20,8 +20,8 @@ use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::hummock_version_delta::GroupDeltas; use risingwave_pb::hummock::{ - CompactionConfig, GroupConstruct, GroupDestroy, GroupMetaChange, HummockVersion, - HummockVersionDelta, Level, LevelType, OverlappingLevel, SstableInfo, + CompactionConfig, GroupConstruct, GroupDestroy, GroupMetaChange, GroupTableChange, + HummockVersion, HummockVersionDelta, Level, LevelType, OverlappingLevel, SstableInfo, }; use super::StateTableId; @@ -38,6 +38,7 @@ pub struct GroupDeltasSummary { pub group_construct: Option, pub group_destroy: Option, pub group_meta_changes: Vec, + pub group_table_change: Option, } pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary { @@ -49,6 +50,8 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary let mut group_construct = None; let mut group_destroy = None; let mut group_meta_changes = vec![]; + let mut group_table_change = None; + for group_delta in &group_deltas.group_deltas { match group_delta.get_delta_type().unwrap() { DeltaType::IntraLevel(intra_level) => { @@ -73,6 +76,9 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary DeltaType::GroupMetaChange(meta_delta) => { group_meta_changes.push(meta_delta.clone()); } + DeltaType::GroupTableChange(meta_delta) => { + group_table_change = Some(meta_delta.clone()); + } } } @@ -85,6 +91,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary group_construct, group_destroy, group_meta_changes, + group_table_change, } } @@ -106,25 +113,25 @@ pub trait HummockVersionExt { fn get_object_ids(&self) -> Vec; } +pub type BranchedSstInfo = HashMap; + pub trait HummockVersionUpdateExt { fn count_new_ssts_in_group_split( - &mut self, + &self, parent_group_id: CompactionGroupId, - member_table_ids: &HashSet, + member_table_ids: HashSet, ) -> u64; fn init_with_parent_group( &mut self, parent_group_id: CompactionGroupId, group_id: CompactionGroupId, - member_table_ids: &HashSet, + member_table_ids: HashSet, new_sst_start_id: u64, ) -> Vec; fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) -> Vec; fn build_compaction_group_info(&self) -> HashMap; - fn build_branched_sst_info( - &self, - ) -> BTreeMap>>; + fn build_branched_sst_info(&self) -> BTreeMap; } impl HummockVersionExt for HummockVersion { @@ -206,9 +213,9 @@ pub type SstSplitInfo = ( impl HummockVersionUpdateExt for HummockVersion { fn count_new_ssts_in_group_split( - &mut self, + &self, parent_group_id: CompactionGroupId, - member_table_ids: &HashSet, + member_table_ids: HashSet, ) -> u64 { self.levels .get(&parent_group_id) @@ -246,7 +253,7 @@ impl HummockVersionUpdateExt for HummockVersion { &mut self, parent_group_id: CompactionGroupId, group_id: CompactionGroupId, - member_table_ids: &HashSet, + member_table_ids: HashSet, new_sst_start_id: u64, ) -> Vec { let mut new_sst_id = new_sst_start_id; @@ -260,107 +267,69 @@ impl HummockVersionUpdateExt for HummockVersion { .levels .get_many_mut([&parent_group_id, &group_id]) .unwrap(); - let remove_sst_stat_from_level = |level: &mut Level, sst: &SstableInfo| { - level.total_file_size -= sst.file_size; - level.uncompressed_file_size -= sst.uncompressed_file_size; - }; if let Some(ref mut l0) = parent_levels.l0 { for sub_level in &mut l0.sub_levels { - let mut insert_table_infos = vec![]; - for sst_info in &mut sub_level.table_infos { - if sst_info - .get_table_ids() - .iter() - .any(|table_id| member_table_ids.contains(table_id)) - { - let is_trivial = sst_info - .get_table_ids() - .iter() - .all(|table_id| member_table_ids.contains(table_id)); - let mut branch_table_info = sst_info.clone(); - branch_table_info.sst_id = new_sst_id; - new_sst_id += 1; - let parent_old_sst_id = sst_info.get_sst_id(); - split_id_vers.push(( - branch_table_info.get_object_id(), - branch_table_info.get_sst_id(), - parent_old_sst_id, - if is_trivial { - None - } else { - sst_info.sst_id = new_sst_id; - new_sst_id += 1; - Some(sst_info.get_sst_id()) - }, - )); - branch_table_info.table_ids = sst_info - .table_ids - .drain_filter(|table_id| member_table_ids.contains(table_id)) - .collect_vec(); - insert_table_infos.push(branch_table_info); + let target_l0 = cur_levels.l0.as_mut().unwrap(); + let mut target_level_idx = target_l0.sub_levels.len(); + for (idx, other) in target_l0.sub_levels.iter_mut().enumerate() { + if other.sub_level_id == sub_level.sub_level_id { + target_level_idx = idx; } } // Remove SST from sub level may result in empty sub level. It will be purged // whenever another compaction task is finished. - let removed = sub_level + let insert_table_infos = split_sst_info_for_level( + sub_level, + &mut split_id_vers, + &member_table_ids, + &mut new_sst_id, + ); + sub_level .table_infos .drain_filter(|sst_info| sst_info.table_ids.is_empty()) - .collect_vec(); - for removed_sst in removed { - remove_sst_stat_from_level(sub_level, &removed_sst); - } - add_new_sub_level( - cur_levels.l0.as_mut().unwrap(), - sub_level.get_sub_level_id(), + .for_each(|sst_info| { + sub_level.total_file_size -= sst_info.file_size; + sub_level.uncompressed_file_size -= sst_info.uncompressed_file_size; + l0.total_file_size -= sst_info.file_size; + l0.uncompressed_file_size -= sst_info.uncompressed_file_size; + }); + add_ssts_to_sub_level( + target_l0, + target_level_idx, + sub_level.sub_level_id, sub_level.level_type(), insert_table_infos, ); } } for (z, level) in parent_levels.levels.iter_mut().enumerate() { - for sst_info in &mut level.table_infos { - if sst_info - .get_table_ids() - .iter() - .any(|table_id| member_table_ids.contains(table_id)) - { - let is_trivial = sst_info - .get_table_ids() - .iter() - .all(|table_id| member_table_ids.contains(table_id)); - let mut branch_table_info = sst_info.clone(); - branch_table_info.sst_id = new_sst_id; - new_sst_id += 1; - let parent_old_sst_id = sst_info.get_sst_id(); - split_id_vers.push(( - branch_table_info.get_object_id(), - branch_table_info.get_sst_id(), - parent_old_sst_id, - if is_trivial { - None - } else { - sst_info.sst_id = new_sst_id; - new_sst_id += 1; - Some(sst_info.get_sst_id()) - }, - )); - branch_table_info.table_ids = sst_info - .table_ids - .drain_filter(|table_id| member_table_ids.contains(table_id)) - .collect_vec(); - cur_levels.levels[z].total_file_size += branch_table_info.file_size; - cur_levels.levels[z].uncompressed_file_size += - branch_table_info.uncompressed_file_size; - cur_levels.levels[z].table_infos.push(branch_table_info); - } - } - let removed = level + let insert_table_infos = split_sst_info_for_level( + level, + &mut split_id_vers, + &member_table_ids, + &mut new_sst_id, + ); + cur_levels.levels[z].total_file_size += insert_table_infos + .iter() + .map(|sst| sst.file_size) + .sum::(); + cur_levels.levels[z].uncompressed_file_size += insert_table_infos + .iter() + .map(|sst| sst.uncompressed_file_size) + .sum::(); + cur_levels.levels[z].table_infos.extend(insert_table_infos); + cur_levels.levels[z].table_infos.sort_by(|sst1, sst2| { + let a = sst1.key_range.as_ref().unwrap(); + let b = sst2.key_range.as_ref().unwrap(); + a.compare(b) + }); + level .table_infos .drain_filter(|sst_info| sst_info.table_ids.is_empty()) - .collect_vec(); - for removed_sst in removed { - remove_sst_stat_from_level(level, &removed_sst); - } + .for_each(|sst_info| { + level.total_file_size -= sst_info.file_size; + level.uncompressed_file_size -= sst_info.uncompressed_file_size; + }); } split_id_vers } @@ -381,9 +350,24 @@ impl HummockVersionUpdateExt for HummockVersion { sst_split_info.extend(self.init_with_parent_group( parent_group_id, *compaction_group_id, - &HashSet::from_iter(group_construct.get_table_ids().iter().cloned()), + HashSet::from_iter(group_construct.table_ids.clone()), group_construct.get_new_sst_start_id(), )); + } else if let Some(group_change) = &summary.group_table_change { + sst_split_info.extend(self.init_with_parent_group( + group_change.origin_group_id, + group_change.target_group_id, + HashSet::from_iter(group_change.table_ids.clone()), + group_change.new_sst_start_id, + )); + + let levels = self + .levels + .get_mut(&group_change.origin_group_id) + .expect("compaction group should exist"); + levels + .member_table_ids + .drain_filter(|t| group_change.table_ids.contains(t)); } let has_destroy = summary.group_destroy.is_some(); let levels = self @@ -398,6 +382,7 @@ impl HummockVersionUpdateExt for HummockVersion { levels .member_table_ids .drain_filter(|t| group_meta_delta.table_ids_remove.contains(t)); + levels.member_table_ids.sort(); } assert!( @@ -456,29 +441,26 @@ impl HummockVersionUpdateExt for HummockVersion { ret } - fn build_branched_sst_info( - &self, - ) -> BTreeMap>> { - let mut ret: BTreeMap<_, BTreeMap<_, Vec<_>>> = BTreeMap::new(); - for compaction_group_id in self.get_levels().keys() { - self.level_iter(*compaction_group_id, |level| { - for table_info in level.get_table_infos() { + fn build_branched_sst_info(&self) -> BTreeMap { + let mut ret: BTreeMap<_, _> = BTreeMap::new(); + for (compaction_group_id, group) in &self.levels { + let mut levels = vec![]; + levels.extend(group.l0.as_ref().unwrap().sub_levels.iter()); + levels.extend(group.levels.iter()); + for level in levels { + for table_info in &level.table_infos { + if table_info.sst_id == table_info.object_id { + continue; + } let object_id = table_info.get_object_id(); - ret.entry(object_id) - .or_default() - .entry(*compaction_group_id) - .or_default() - .push(table_info.get_sst_id()); + let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default(); + if let Some(exist_sst_id) = entry.get(compaction_group_id) { + panic!("we do not allow more than one sst with the same object id in one grou. object-id: {}, duplicated sst id: {:?} and {}", object_id, exist_sst_id, table_info.sst_id); + } + entry.insert(*compaction_group_id, table_info.sst_id); } - true - }); - } - ret.retain(|object_id, v| { - v.len() != 1 || { - let sst_id_vec = v.values().next().unwrap(); - sst_id_vec.len() != 1 || sst_id_vec[0] != *object_id } - }); + } ret } } @@ -604,6 +586,51 @@ pub fn build_initial_compaction_group_levels( } } +fn split_sst_info_for_level( + level: &mut Level, + split_id_vers: &mut Vec, + member_table_ids: &HashSet, + new_sst_id: &mut u64, +) -> Vec { + // Remove SST from sub level may result in empty sub level. It will be purged + // whenever another compaction task is finished. + let mut removed = vec![]; + let mut insert_table_infos = vec![]; + for sst_info in &mut level.table_infos { + let removed_table_ids = sst_info + .table_ids + .iter() + .filter(|table_id| member_table_ids.contains(table_id)) + .cloned() + .collect_vec(); + if !removed_table_ids.is_empty() { + let is_trivial = removed_table_ids.len() == sst_info.table_ids.len(); + let mut branch_table_info = sst_info.clone(); + branch_table_info.sst_id = *new_sst_id; + *new_sst_id += 1; + let parent_old_sst_id = sst_info.get_sst_id(); + split_id_vers.push(( + branch_table_info.get_object_id(), + branch_table_info.get_sst_id(), + parent_old_sst_id, + if is_trivial { + None + } else { + sst_info.sst_id = *new_sst_id; + *new_sst_id += 1; + Some(sst_info.get_sst_id()) + }, + )); + if is_trivial { + sst_info.table_ids.clear(); + removed.push(sst_info.clone()); + } + insert_table_infos.push(branch_table_info); + } + } + insert_table_infos +} + pub fn try_get_compaction_group_id_by_table_id( version: &HummockVersion, table_id: StateTableId, @@ -680,6 +707,37 @@ pub fn new_sub_level( } } +pub fn add_ssts_to_sub_level( + l0: &mut OverlappingLevel, + sub_level_idx: usize, + insert_sub_level_id: u64, + level_type: LevelType, + insert_table_infos: Vec, +) { + if sub_level_idx < l0.sub_levels.len() { + insert_table_infos.iter().for_each(|sst| { + l0.sub_levels[sub_level_idx].total_file_size += sst.file_size; + l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size; + l0.total_file_size += sst.file_size; + l0.uncompressed_file_size += sst.uncompressed_file_size; + }); + l0.sub_levels[sub_level_idx] + .table_infos + .extend(insert_table_infos); + if l0.sub_levels[sub_level_idx].level_type == LevelType::Nonoverlapping as i32 { + l0.sub_levels[sub_level_idx] + .table_infos + .sort_by(|sst1, sst2| { + let a = sst1.key_range.as_ref().unwrap(); + let b = sst2.key_range.as_ref().unwrap(); + a.compare(b) + }); + } + return; + } + add_new_sub_level(l0, insert_sub_level_id, level_type, insert_table_infos); +} + pub fn add_new_sub_level( l0: &mut OverlappingLevel, insert_sub_level_id: u64, diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 32835a6da2fc..4a6bc7012a5b 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -52,6 +52,8 @@ pub type HummockCompactionTaskId = u64; pub type CompactionGroupId = u64; pub const INVALID_VERSION_ID: HummockVersionId = 0; pub const FIRST_VERSION_ID: HummockVersionId = 1; +pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56; +pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56; #[macro_export] /// This is wrapper for `info` log. diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index ae7ba44ec927..6dd6851a8cb8 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -729,7 +729,6 @@ pub(crate) mod tests { .await .unwrap() .unwrap(); - compact_task.existing_table_ids.push(2); let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); @@ -903,7 +902,6 @@ pub(crate) mod tests { .unwrap() .unwrap(); - compact_task.existing_table_ids.push(existing_table_id); let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); let retention_seconds_expire_second = 1; @@ -1092,7 +1090,6 @@ pub(crate) mod tests { kv_count, ); - compact_task.existing_table_ids.push(existing_table_id); let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); // compact_task.table_options = diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 2064b0844888..61a81ea3c1f4 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -53,17 +53,6 @@ impl CompactorRunner { .map(|table| table.file_size) .sum::(); - let stats_target_table_ids: HashSet = task - .input_ssts - .iter() - .flat_map(|i| { - i.table_infos - .iter() - .flat_map(|t| t.table_ids.clone()) - .collect_vec() - }) - .collect(); - let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into(); options.capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size); options.compression_algorithm = match task.compression_algorithm { @@ -89,7 +78,7 @@ impl CompactorRunner { cache_policy: CachePolicy::NotFill, gc_delete_keys: task.gc_delete_keys, watermark: task.watermark, - stats_target_table_ids: Some(stats_target_table_ids), + stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())), task_type: task.task_type(), split_by_table: task.split_by_state_table, }, @@ -181,6 +170,7 @@ impl CompactorRunner { .cloned() .collect_vec(); table_iters.push(ConcatSstableIterator::new( + self.compact_task.existing_table_ids.clone(), tables, self.compactor.task_config.key_range.clone(), self.sstable_store.clone(), @@ -192,6 +182,7 @@ impl CompactorRunner { continue; } table_iters.push(ConcatSstableIterator::new( + self.compact_task.existing_table_ids.clone(), vec![table_info.clone()], self.compactor.task_config.key_range.clone(), self.sstable_store.clone(), diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index fc1bee3f96e1..070546f0a5be 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -13,11 +13,13 @@ // limitations under the License. use std::cmp::Ordering; +use std::collections::HashSet; use std::future::Future; use std::sync::atomic::AtomicU64; use std::sync::{atomic, Arc}; use std::time::Instant; +use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::KeyComparator; @@ -45,6 +47,7 @@ struct SstableStreamIterator { /// For key sanity check of divided SST and debugging sstable_info: SstableInfo, + existing_table_ids: HashSet, } impl SstableStreamIterator { @@ -64,6 +67,7 @@ impl SstableStreamIterator { /// The iterator reads at most `max_block_count` from the stream. pub fn new( sstable_info: &SstableInfo, + existing_table_ids: HashSet, block_stream: BlockStream, max_block_count: usize, stats: &StoreLocalStatistic, @@ -73,6 +77,7 @@ impl SstableStreamIterator { block_iter: None, remaining_blocks: max_block_count, stats_ptr: stats.remote_io_time.clone(), + existing_table_ids, sstable_info: sstable_info.clone(), } } @@ -80,10 +85,8 @@ impl SstableStreamIterator { async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> { while let Some(block_iter) = self.block_iter.as_mut() { if self - .sstable_info - .get_table_ids() - .binary_search(&block_iter.table_id().table_id) - .is_ok() + .existing_table_ids + .contains(&block_iter.table_id().table_id) { return Ok(()); } else { @@ -111,6 +114,7 @@ impl SstableStreamIterator { if !block_iter.is_valid() { // `seek_key` is larger than everything in the first block. self.next_block().await?; + } else { } } @@ -214,7 +218,9 @@ pub struct ConcatSstableIterator { cur_idx: usize, /// All non-overlapping tables. - tables: Vec, + sstables: Vec, + + existing_table_ids: HashSet, sstable_store: SstableStoreRef, @@ -226,7 +232,8 @@ impl ConcatSstableIterator { /// arranged in ascending order when it serves as a forward iterator, /// and arranged in descending order when it serves as a backward iterator. pub fn new( - tables: Vec, + existing_table_ids: Vec, + sst_infos: Vec, key_range: KeyRange, sstable_store: SstableStoreRef, ) -> Self { @@ -234,7 +241,8 @@ impl ConcatSstableIterator { key_range, sstable_iter: None, cur_idx: 0, - tables, + sstables: sst_infos, + existing_table_ids: HashSet::from_iter(existing_table_ids), sstable_store, stats: StoreLocalStatistic::default(), } @@ -243,7 +251,7 @@ impl ConcatSstableIterator { /// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given. async fn seek_idx( &mut self, - mut idx: usize, + idx: usize, seek_key: Option>, ) -> HummockResult<()> { self.sstable_iter.take(); @@ -257,14 +265,25 @@ impl ConcatSstableIterator { (None, true) => None, (None, false) => Some(FullKey::decode(&self.key_range.left)), }; - - while idx < self.tables.len() { - let table_info = &self.tables[idx]; - let table = self + self.cur_idx = idx; + while self.cur_idx < self.sstables.len() { + let table_info = &self.sstables[self.cur_idx]; + let mut found = table_info + .table_ids + .iter() + .any(|table_id| self.existing_table_ids.contains(table_id)); + if !found { + self.cur_idx += 1; + seek_key = None; + continue; + } + let sstable = self .sstable_store .sstable(table_info, &mut self.stats) .await?; - let block_metas = &table.value().meta.block_metas; + let stats_ptr = self.stats.remote_io_time.clone(); + let now = Instant::now(); + let block_metas = &sstable.value().meta.block_metas; let mut start_index = match seek_key { None => 0, Some(seek_key) => { @@ -286,32 +305,24 @@ impl ConcatSstableIterator { ) != Ordering::Greater }) }; - while start_index < end_index { let start_block_table_id = block_metas[start_index].table_id(); - if table_info - .get_table_ids() - .binary_search(&start_block_table_id.table_id) - .is_ok() + if self + .existing_table_ids + .contains(&block_metas[start_index].table_id().table_id) { break; - } else { - start_index += - &block_metas[(start_index + 1)..].partition_point(|block_meta| { - block_meta.table_id() == start_block_table_id - }) + 1; } + start_index += &block_metas[(start_index + 1)..] + .partition_point(|block_meta| block_meta.table_id() == start_block_table_id) + + 1; } - - let found = if end_index <= start_index { - false + if start_index >= end_index { + found = false; } else { - let stats_ptr = self.stats.remote_io_time.clone(); - let now = Instant::now(); - let block_stream = self .sstable_store - .get_stream(table.value(), Some(start_index)) + .get_stream(sstable.value(), Some(start_index)) .await?; // Determine time needed to open stream. @@ -320,6 +331,7 @@ impl ConcatSstableIterator { let mut sstable_iter = SstableStreamIterator::new( table_info, + self.existing_table_ids.clone(), block_stream, end_index - start_index, &self.stats, @@ -328,17 +340,14 @@ impl ConcatSstableIterator { if sstable_iter.is_valid() { self.sstable_iter = Some(sstable_iter); - true } else { - false + found = false; } - }; - self.cur_idx = idx; - + } if found { return Ok(()); } else { - idx += 1; + self.cur_idx += 1; seek_key = None; } } @@ -396,7 +405,7 @@ impl HummockIterator for ConcatSstableIterator { Ordering::Greater => key, } }; - let table_idx = self.tables.partition_point(|table| { + let table_idx = self.sstables.partition_point(|table| { // We use the maximum key of an SST for the search. That way, we guarantee that the // resulting SST contains either that key or the next-larger KV-pair. Subsequently, // we avoid calling `seek_idx()` twice if the determined SST does not contain `key`. @@ -456,8 +465,12 @@ mod tests { test_key_of(start_index).encode().into(), test_key_of(end_index).encode().into(), ); - let mut iter = - ConcatSstableIterator::new(table_infos.clone(), kr.clone(), sstable_store.clone()); + let mut iter = ConcatSstableIterator::new( + vec![0], + table_infos.clone(), + kr.clone(), + sstable_store.clone(), + ); iter.seek(FullKey::decode(&kr.left)).await.unwrap(); for idx in start_index..end_index { @@ -476,16 +489,24 @@ mod tests { test_key_of(30000).encode().into(), test_key_of(40000).encode().into(), ); - let mut iter = - ConcatSstableIterator::new(table_infos.clone(), kr.clone(), sstable_store.clone()); + let mut iter = ConcatSstableIterator::new( + vec![0], + table_infos.clone(), + kr.clone(), + sstable_store.clone(), + ); iter.seek(FullKey::decode(&kr.left)).await.unwrap(); assert!(!iter.is_valid()); let kr = KeyRange::new( test_key_of(start_index).encode().into(), test_key_of(40000).encode().into(), ); - let mut iter = - ConcatSstableIterator::new(table_infos.clone(), kr.clone(), sstable_store.clone()); + let mut iter = ConcatSstableIterator::new( + vec![0], + table_infos.clone(), + kr.clone(), + sstable_store.clone(), + ); iter.seek(FullKey::decode(&kr.left)).await.unwrap(); for idx in start_index..30000 { let key = iter.key(); @@ -504,8 +525,12 @@ mod tests { test_key_of(0).encode().into(), test_key_of(40000).encode().into(), ); - let mut iter = - ConcatSstableIterator::new(table_infos.clone(), kr.clone(), sstable_store.clone()); + let mut iter = ConcatSstableIterator::new( + vec![0], + table_infos.clone(), + kr.clone(), + sstable_store.clone(), + ); iter.seek(test_key_of(10000).to_ref()).await.unwrap(); assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref()); iter.seek(test_key_of(10001).to_ref()).await.unwrap(); @@ -524,8 +549,12 @@ mod tests { test_key_of(6000).encode().into(), test_key_of(16000).encode().into(), ); - let mut iter = - ConcatSstableIterator::new(table_infos.clone(), kr.clone(), sstable_store.clone()); + let mut iter = ConcatSstableIterator::new( + vec![0], + table_infos.clone(), + kr.clone(), + sstable_store.clone(), + ); iter.seek(test_key_of(17000).to_ref()).await.unwrap(); assert!(!iter.is_valid()); iter.seek(test_key_of(1).to_ref()).await.unwrap(); @@ -555,10 +584,14 @@ mod tests { test_key_of(0).encode().into(), test_key_of(40000).encode().into(), ); - let mut iter = - ConcatSstableIterator::new(table_infos.clone(), kr.clone(), sstable_store.clone()); + let mut iter = ConcatSstableIterator::new( + vec![0], + table_infos.clone(), + kr.clone(), + sstable_store.clone(), + ); let sst = sstable_store - .sstable(&iter.tables[0], &mut iter.stats) + .sstable(&iter.sstables[0], &mut iter.stats) .await .unwrap(); let block_metas = &sst.value().meta.block_metas; @@ -591,8 +624,12 @@ mod tests { next_full_key(&block_1_smallest_key).into(), prev_full_key(&block_2_smallest_key).into(), ); - let mut iter = - ConcatSstableIterator::new(table_infos.clone(), kr.clone(), sstable_store.clone()); + let mut iter = ConcatSstableIterator::new( + vec![0], + table_infos.clone(), + kr.clone(), + sstable_store.clone(), + ); // Use block_2_smallest_key as seek key and result in invalid iterator. let seek_key = FullKey::decode(&block_2_smallest_key); assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);