Skip to content

Commit

Permalink
fix(storage): reduce log and fix total_cpu_num
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Mar 9, 2023
1 parent abe2015 commit 65ef720
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ pub trait CompactionPicker {
) -> Option<CompactionInput>;
}

#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
pub struct ScaleCompactorInfo {
pub running_cores: u64,
pub total_cores: u64,
Expand Down
33 changes: 20 additions & 13 deletions src/meta/src/hummock/compaction_schedule_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,7 @@ impl CompactionSchedulePolicy for RoundRobinPolicy {
fn refresh_state(&mut self) {}

fn total_cpu_core_num(&self) -> u32 {
self.compactor_map
.values()
.map(|c| c.cpu_ratio.load(Ordering::Acquire))
.sum()
self.compactor_map.values().map(|c| c.total_cpu_core).sum()
}

fn total_running_cpu_core_num(&self) -> u32 {
Expand Down Expand Up @@ -475,6 +472,8 @@ impl CompactionSchedulePolicy for ScoredPolicy {
.filter(|compactor| !compactor.is_busy())
.count();

let old_state = self.state.clone();

if idle_count == 0 {
match self.state {
CompactorState::Idle(last_update) => {
Expand Down Expand Up @@ -507,26 +506,34 @@ impl CompactionSchedulePolicy for ScoredPolicy {
}
}

tracing::info!(
"refresh_state idle_count {} total {} state {:?} suggest_scale_policy {:?}",
idle_count,
self.score_to_compactor.len(),
self.state,
self.suggest_scale_policy(),
);
if self.state != old_state {
tracing::info!(
"refresh_state idle_count {} total {} state {:?} suggest_scale_policy {:?}",
idle_count,
self.score_to_compactor.len(),
self.state,
self.suggest_scale_policy(),
);
}
}

fn total_cpu_core_num(&self) -> u32 {
self.score_to_compactor
.values()
.map(|c| c.cpu_ratio.load(Ordering::Acquire))
.map(|c| c.total_cpu_core)
.sum()
}

fn total_running_cpu_core_num(&self) -> u32 {
self.score_to_compactor
.values()
.map(|c| c.cpu_ratio.load(Ordering::Acquire) * c.total_cpu_core / 100)
.map(|c| {
let running_core =
(c.cpu_ratio.load(Ordering::Acquire) as f64 * c.total_cpu_core as f64 / 100.0)
.ceil() as u32;

running_core
})
.sum()
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,11 @@ impl CompactorManager {
) -> Receiver<MetaResult<SubscribeCompactTasksResponse>> {
let mut policy = self.policy.write();
let rx = policy.add_compactor(context_id, max_concurrent_task_number, cpu_core_num);
tracing::info!("Added compactor session {}", context_id);
tracing::info!(
"Added compactor session {} cpu_core_num {}",
context_id,
cpu_core_num
);
rx
}

Expand Down Expand Up @@ -426,7 +430,6 @@ impl CompactorManager {
) {
if let Some(compactor) = self.policy.read().get_compactor(context_id) {
compactor.cpu_ratio.store(workload.cpu, Ordering::Release);
tracing::info!("update_compactor_state cpu {}", workload.cpu,);
}

self.policy.write().refresh_state();
Expand Down
7 changes: 7 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1968,6 +1968,12 @@ where
self.metrics
.waiting_compaction_bytes
.set(info.waiting_compaction_bytes as i64);

tracing::info!(
"report_scale_compactor_info {:?} suggest_core {:?}",
info,
suggest_core
);
}

#[named]
Expand All @@ -1990,6 +1996,7 @@ where
let cg = self.get_compaction_group_config(*group_id).await;
let info = status.get_compaction_info(levels, cg.compaction_config());
global_info.add(&info);
tracing::info!("cg {} info {:?}", group_id, info);
}
}
global_info
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/service/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ where
format!("invalid hummock context {}", context_id),
));
}

let rx = self.compactor_manager.add_compactor(
context_id,
req.max_concurrent_task_number,
Expand Down

0 comments on commit 65ef720

Please sign in to comment.