Skip to content

Commit

Permalink
chore(storage): monitor compacting task count per level (risingwavela…
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Mar 31, 2023
1 parent 188846e commit 6c99e93
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 24 deletions.
34 changes: 21 additions & 13 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def section_compaction(outer_panels):
"num of SSTs written into next level during history compactions to next level",
[
panels.target(
f"sum({metric('storage_level_compact_write')}) / sum({metric('state_store_write_build_l0_bytes')})",
f"sum({metric('storage_level_compact_write')}) / sum({metric('compactor_write_build_l0_bytes')})",
"write amplification",
),
],
Expand All @@ -705,13 +705,21 @@ def section_compaction(outer_panels):
"L{{level_index}}"),
],
),
panels.timeseries_count(
"Compacting Task Count",
"num of compact_task",
[
panels.target(f"{metric('storage_level_compact_task_cnt')}",
"{{task}}"),
],
),
panels.timeseries_bytes_per_sec(
"KBs Read from Next Level",
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_read_next')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} read",
f"sum(rate({metric('storage_level_compact_read_next')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} read",
),
],
),
Expand All @@ -720,8 +728,8 @@ def section_compaction(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_read_curr')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} read",
f"sum(rate({metric('storage_level_compact_read_curr')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} read",
),
],
),
Expand All @@ -730,8 +738,8 @@ def section_compaction(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_read_sstn_curr')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} read",
f"sum(rate({metric('storage_level_compact_read_sstn_curr')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} read",
),
],
),
Expand All @@ -740,8 +748,8 @@ def section_compaction(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_write')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} write",
f"sum(rate({metric('storage_level_compact_write')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} write",
),
],
),
Expand All @@ -750,8 +758,8 @@ def section_compaction(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_write_sstn')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} write",
f"sum(rate({metric('storage_level_compact_write_sstn')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} write",
),
],
),
Expand All @@ -760,8 +768,8 @@ def section_compaction(outer_panels):
"num of SSTs read from next level during history compactions to next level",
[
panels.target(
f"sum(rate({metric('storage_level_compact_read_sstn_next')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} read",
f"sum(rate({metric('storage_level_compact_read_sstn_next')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} read",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/meta/src/hummock/level_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl LevelHandler {
.map(|task| task.task_id)
.collect_vec()
}

pub fn get_pending_tasks(&self) -> Vec<RunningCompactTask> {
self.pending_tasks.clone()
}
}

impl From<&LevelHandler> for risingwave_pb::hummock::LevelHandler {
Expand Down
32 changes: 30 additions & 2 deletions src/meta/src/hummock/metrics_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -89,9 +89,10 @@ pub fn trigger_sst_stat(
level_sst_size / 1024
};

let mut compacting_task_stat: BTreeMap<(usize, usize), usize> = BTreeMap::default();
for idx in 0..current_version.num_levels(compaction_group_id) {
let sst_num = level_sst_cnt(idx);
let level_label = format!("{}_{}", idx, compaction_group_id);
let level_label = format!("cg{}_L{}", compaction_group_id, idx);
metrics
.level_sst_num
.with_label_values(&[&level_label])
Expand All @@ -106,9 +107,36 @@ pub fn trigger_sst_stat(
.level_compact_cnt
.with_label_values(&[&level_label])
.set(compact_cnt as i64);

let compacting_task = compact_status.level_handlers[idx].get_pending_tasks();
let mut pending_task_ids: HashSet<u64> = HashSet::default();
for task in compacting_task {
if pending_task_ids.contains(&task.task_id) {
continue;
}

if idx != 0 && idx == task.target_level as usize {
continue;
}

let key = (idx, task.target_level as usize);
let count = compacting_task_stat.entry(key).or_insert(0);
*count += 1;

pending_task_ids.insert(task.task_id);
}
}
}

tracing::debug!("LSM Compacting STAT {:?}", compacting_task_stat);
for ((select, target), compacting_task_count) in compacting_task_stat {
let label_str = format!("cg{} L{} -> L{}", compaction_group_id, select, target);
metrics
.level_compact_task_cnt
.with_label_values(&[&label_str])
.set(compacting_task_count as _);
}

let level_label = format!("cg{}_l0_sub", compaction_group_id);
let sst_num = current_version
.levels
Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub struct MetaMetrics {
/// The number of compactor CPU need to be scale.
pub scale_compactor_core_num: IntGauge,

pub level_compact_task_cnt: IntGaugeVec,
pub object_store_metric: Arc<ObjectStoreMetrics>,
}

Expand Down Expand Up @@ -328,6 +329,13 @@ impl MetaMetrics {
)
.unwrap();

let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
"storage_level_compact_task_cnt",
"num of compact_task organized by group and level",
&["task"],
registry
)
.unwrap();
let object_store_metric = Arc::new(ObjectStoreMetrics::new(registry.clone()));

Self {
Expand Down Expand Up @@ -365,6 +373,7 @@ impl MetaMetrics {
compact_pending_bytes,
compact_level_compression_ratio,
scale_compactor_core_num,
level_compact_task_cnt,
object_store_metric,
}
}
Expand Down
17 changes: 10 additions & 7 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Compactor {
context
.compactor_metrics
.compact_read_current_level
.with_label_values(&[group_label.as_str(), cur_level_label.as_str()])
.with_label_values(&[&group_label, &cur_level_label])
.inc_by(
select_table_infos
.iter()
Expand All @@ -140,26 +140,29 @@ impl Compactor {
context
.compactor_metrics
.compact_read_sstn_current_level
.with_label_values(&[group_label.as_str(), cur_level_label.as_str()])
.with_label_values(&[&group_label, &cur_level_label])
.inc_by(select_table_infos.len() as u64);

let sec_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::<u64>();
let next_level_label = compact_task.target_level.to_string();
context
.compactor_metrics
.compact_read_next_level
.with_label_values(&[group_label.as_str(), next_level_label.as_str()])
.with_label_values(&[&group_label, next_level_label.as_str()])
.inc_by(sec_level_read_bytes);
context
.compactor_metrics
.compact_read_sstn_next_level
.with_label_values(&[group_label.as_str(), next_level_label.as_str()])
.with_label_values(&[&group_label, next_level_label.as_str()])
.inc_by(target_table_infos.len() as u64);

let timer = context
.compactor_metrics
.compact_task_duration
.with_label_values(&[compact_task.input_ssts[0].level_idx.to_string().as_str()])
.with_label_values(&[
&group_label,
&compact_task.input_ssts[0].level_idx.to_string(),
])
.start_timer();

let (need_quota, file_counts) = estimate_state_for_compaction(&compact_task);
Expand Down Expand Up @@ -313,12 +316,12 @@ impl Compactor {
context
.compactor_metrics
.compact_write_bytes
.with_label_values(&[group_label.as_str(), level_label.as_str()])
.with_label_values(&[&group_label, level_label.as_str()])
.inc_by(compaction_write_bytes);
context
.compactor_metrics
.compact_write_sstn
.with_label_values(&[group_label.as_str(), level_label.as_str()])
.with_label_values(&[&group_label, level_label.as_str()])
.inc_by(compact_task.sorted_output_ssts.len() as u64);

if let Err(e) = context
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/monitor/compactor_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl CompactorMetrics {
exponential_buckets(0.1, 1.6, 28).unwrap() // max 52000s
);
let compact_task_duration =
register_histogram_vec_with_registry!(opts, &["level"], registry).unwrap();
register_histogram_vec_with_registry!(opts, &["group", "level"], registry).unwrap();
let opts = histogram_opts!(
"compactor_get_table_id_total_time_duration",
"Total time of compact that have been issued to state store",
Expand Down

0 comments on commit 6c99e93

Please sign in to comment.