From c9a93f1520c3deeafe35b2b2bfd2352d12356ad4 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 11 Sep 2023 12:29:50 +0300 Subject: [PATCH 1/3] Report `tracing_unbounded` channel size to prometheus --- substrate/client/utils/src/metrics.rs | 26 ++++++++++++++++++++++---- substrate/client/utils/src/mpsc.rs | 21 ++++++++++++++------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/substrate/client/utils/src/metrics.rs b/substrate/client/utils/src/metrics.rs index 6bbdbe2e2e59..5b256f2c1256 100644 --- a/substrate/client/utils/src/metrics.rs +++ b/substrate/client/utils/src/metrics.rs @@ -24,7 +24,10 @@ use prometheus::{ Error as PrometheusError, Registry, }; -use prometheus::{core::GenericCounterVec, Opts}; +use prometheus::{ + core::{GenericCounterVec, GenericGaugeVec}, + Opts, +}; lazy_static! { pub static ref TOKIO_THREADS_TOTAL: GenericCounter = @@ -36,11 +39,25 @@ lazy_static! { } lazy_static! { - pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec = GenericCounterVec::new( - Opts::new("substrate_unbounded_channel_len", "Items in each mpsc::unbounded instance"), - &["entity", "action"] // 'name of channel, send|received|dropped + pub static ref UNBOUNDED_CHANNELS_COUNTER: GenericCounterVec = GenericCounterVec::new( + Opts::new( + "substrate_unbounded_channel_len", + "Items sent/received/dropped on each mpsc::unbounded instance" + ), + &["entity", "action"], // name of channel, send|received|dropped ).expect("Creating of statics doesn't fail. qed"); + pub static SENT_LABEL: &'static str = "send"; + pub static RECEIVED_LABEL: &'static str = "received"; + pub static DROPPED_LABEL: &'static str = "dropped"; + + pub static ref UNBOUNDED_CHANNELS_SIZE: GenericGaugeVec = GenericGaugeVec::new( + Opts::new( + "substrate_unbounded_channel_size", + "Size (number of messages to be processed) of each mpsc::unbounded instance", + ), + &["entity"], // name of channel + ).expect("Creating of statics doesn't fail. qed"); } /// Register the statics to report to registry @@ -48,6 +65,7 @@ pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> { registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?; registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?; registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?; + registry.register(Box::new(UNBOUNDED_CHANNELS_SIZE.clone()))?; Ok(()) } diff --git a/substrate/client/utils/src/mpsc.rs b/substrate/client/utils/src/mpsc.rs index 039e03f9e618..bf2faf66fa4d 100644 --- a/substrate/client/utils/src/mpsc.rs +++ b/substrate/client/utils/src/mpsc.rs @@ -20,7 +20,9 @@ pub use async_channel::{TryRecvError, TrySendError}; -use crate::metrics::UNBOUNDED_CHANNELS_COUNTER; +use crate::metrics::{ + DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, +}; use async_channel::{Receiver, Sender}; use futures::{ stream::{FusedStream, Stream}, @@ -102,7 +104,8 @@ impl TracingUnboundedSender { /// Proxy function to `async_channel::Sender::try_send`. pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { self.inner.try_send(msg).map(|s| { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc(); + UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(self.inner.len()); if self.inner.len() >= self.queue_size_warning && self.warning_fired @@ -140,7 +143,8 @@ impl TracingUnboundedReceiver { /// that discounts the messages taken out. pub fn try_recv(&mut self) -> Result { self.inner.try_recv().map(|s| { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc(); + UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(self.inner.len()); s }) } @@ -155,14 +159,16 @@ impl Drop for TracingUnboundedReceiver { fn drop(&mut self) { // Close the channel to prevent any further messages to be sent into the channel self.close(); - // the number of messages about to be dropped + // The number of messages about to be dropped let count = self.inner.len(); - // discount the messages + // Discount the messages if count > 0 { UNBOUNDED_CHANNELS_COUNTER - .with_label_values(&[self.name, "dropped"]) + .with_label_values(&[self.name, DROPPED_LABEL]) .inc_by(count.saturated_into()); } + // Reset the size metric to 0 + UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0); // Drain all the pending messages in the channel since they can never be accessed, // this can be removed once https://github.com/smol-rs/async-channel/issues/23 is // resolved @@ -180,7 +186,8 @@ impl Stream for TracingUnboundedReceiver { match Pin::new(&mut s.inner).poll_next(cx) { Poll::Ready(msg) => { if msg.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc(); + UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(self.inner.len()); } Poll::Ready(msg) }, From 54a1220d51767931154686f4ca34e954b23eb7cf Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 11 Sep 2023 12:40:46 +0300 Subject: [PATCH 2/3] minor: rustfmt --- substrate/client/utils/src/mpsc.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/substrate/client/utils/src/mpsc.rs b/substrate/client/utils/src/mpsc.rs index bf2faf66fa4d..98deff31a27b 100644 --- a/substrate/client/utils/src/mpsc.rs +++ b/substrate/client/utils/src/mpsc.rs @@ -20,9 +20,7 @@ pub use async_channel::{TryRecvError, TrySendError}; -use crate::metrics::{ - DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, -}; +use crate::metrics::{DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER}; use async_channel::{Receiver, Sender}; use futures::{ stream::{FusedStream, Stream}, From daf4e23681fe28a3001097286c9645a38c11b63e Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 11 Sep 2023 12:47:33 +0300 Subject: [PATCH 3/3] Fix compilation --- substrate/client/utils/src/metrics.rs | 9 ++++----- substrate/client/utils/src/mpsc.rs | 16 ++++++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/substrate/client/utils/src/metrics.rs b/substrate/client/utils/src/metrics.rs index 5b256f2c1256..308e90cb2537 100644 --- a/substrate/client/utils/src/metrics.rs +++ b/substrate/client/utils/src/metrics.rs @@ -46,11 +46,6 @@ lazy_static! { ), &["entity", "action"], // name of channel, send|received|dropped ).expect("Creating of statics doesn't fail. qed"); - - pub static SENT_LABEL: &'static str = "send"; - pub static RECEIVED_LABEL: &'static str = "received"; - pub static DROPPED_LABEL: &'static str = "dropped"; - pub static ref UNBOUNDED_CHANNELS_SIZE: GenericGaugeVec = GenericGaugeVec::new( Opts::new( "substrate_unbounded_channel_size", @@ -60,6 +55,10 @@ lazy_static! { ).expect("Creating of statics doesn't fail. qed"); } +pub static SENT_LABEL: &'static str = "send"; +pub static RECEIVED_LABEL: &'static str = "received"; +pub static DROPPED_LABEL: &'static str = "dropped"; + /// Register the statics to report to registry pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> { registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?; diff --git a/substrate/client/utils/src/mpsc.rs b/substrate/client/utils/src/mpsc.rs index 98deff31a27b..c24a5bd8904a 100644 --- a/substrate/client/utils/src/mpsc.rs +++ b/substrate/client/utils/src/mpsc.rs @@ -20,7 +20,9 @@ pub use async_channel::{TryRecvError, TrySendError}; -use crate::metrics::{DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER}; +use crate::metrics::{ + DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE, +}; use async_channel::{Receiver, Sender}; use futures::{ stream::{FusedStream, Stream}, @@ -103,7 +105,9 @@ impl TracingUnboundedSender { pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { self.inner.try_send(msg).map(|s| { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc(); - UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(self.inner.len()); + UNBOUNDED_CHANNELS_SIZE + .with_label_values(&[self.name]) + .set(self.inner.len().saturated_into()); if self.inner.len() >= self.queue_size_warning && self.warning_fired @@ -142,7 +146,9 @@ impl TracingUnboundedReceiver { pub fn try_recv(&mut self) -> Result { self.inner.try_recv().map(|s| { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc(); - UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(self.inner.len()); + UNBOUNDED_CHANNELS_SIZE + .with_label_values(&[self.name]) + .set(self.inner.len().saturated_into()); s }) } @@ -185,7 +191,9 @@ impl Stream for TracingUnboundedReceiver { Poll::Ready(msg) => { if msg.is_some() { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc(); - UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(self.inner.len()); + UNBOUNDED_CHANNELS_SIZE + .with_label_values(&[s.name]) + .set(s.inner.len().saturated_into()); } Poll::Ready(msg) },