Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't coerce counters to floats, avoid metrics handles #1144

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions lading/src/blackhole/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
};
use metrics::{counter, Counter};
use metrics::counter;
use serde::{Deserialize, Serialize};
use tower::ServiceBuilder;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -127,14 +127,13 @@ struct KinesisPutRecordBatchResponse {
#[allow(clippy::borrow_interior_mutable_const)]
async fn srv(
status: StatusCode,
bytes_received: Counter,
requests_received: Counter,
metric_labels: Vec<(String, String)>,
body_bytes: Vec<u8>,
req: Request<Body>,
headers: HeaderMap,
response_delay: Duration,
) -> Result<Response<Body>, hyper::Error> {
requests_received.increment(1);
counter!("requests_received", &metric_labels).increment(1);

let (parts, body) = req.into_parts();

Expand All @@ -143,7 +142,7 @@ async fn srv(
match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) {
Err(response) => Ok(response),
Ok(body) => {
bytes_received.increment(body.len() as u64);
counter!("bytes_received", &metric_labels).increment(body.len() as u64);

tokio::time::sleep(response_delay).await;

Expand Down Expand Up @@ -234,20 +233,16 @@ impl Http {
/// Function will return an error if the configuration is invalid or if
/// receiving a packet fails.
pub async fn run(self) -> Result<(), Error> {
let bytes_received = counter!("bytes_received", &self.metric_labels);
let requests_received = counter!("requests_received", &self.metric_labels);
let service = make_service_fn(|_: &AddrStream| {
let bytes_received = bytes_received.clone();
let requests_received = requests_received.clone();
let metric_labels = self.metric_labels.clone();
let body_bytes = self.body_bytes.clone();
let headers = self.headers.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |request| {
debug!("REQUEST: {:?}", request);
srv(
self.status,
bytes_received.clone(),
requests_received.clone(),
metric_labels.clone(),
body_bytes.clone(),
request,
headers.clone(),
Expand Down
5 changes: 3 additions & 2 deletions lading/src/blackhole/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server, StatusCode,
};
use metrics::counter;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use tower::ServiceBuilder;
Expand Down Expand Up @@ -91,15 +92,15 @@ async fn srv(
req: Request<Body>,
labels: Arc<Vec<(String, String)>>,
) -> Result<Response<Body>, Error> {
metrics::counter!("requests_received", &*labels).increment(1);
counter!("requests_received", &*labels).increment(1);

let (parts, body) = req.into_parts();
let bytes = body.collect().await?.to_bytes();

match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) {
Err(response) => Ok(response),
Ok(body) => {
metrics::counter!("bytes_received", &*labels).increment(body.len() as u64);
counter!("bytes_received", &*labels).increment(body.len() as u64);

let mut okay = Response::default();
*okay.status_mut() = StatusCode::OK;
Expand Down
22 changes: 6 additions & 16 deletions lading/src/blackhole/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
};
use metrics::{counter, Counter};
use metrics::counter;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use serde::{Deserialize, Serialize};
use tokio::time::Duration;
Expand Down Expand Up @@ -97,18 +97,9 @@ impl Sqs {
///
/// None known.
pub async fn run(self) -> Result<(), Error> {
let bytes_received = counter!("bytes_received", &self.metric_labels);
let requests_received = counter!("requests_received", &self.metric_labels);
let service = make_service_fn(|_: &AddrStream| {
let bytes_received = bytes_received.clone();
let requests_received = requests_received.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req| {
let bytes_received = bytes_received.clone();
let requests_received = requests_received.clone();
srv(req, requests_received, bytes_received)
}))
}
let metric_labels = self.metric_labels.clone();
async move { Ok::<_, hyper::Error>(service_fn(move |req| srv(req, metric_labels.clone()))) }
});
let svc = ServiceBuilder::new()
.load_shed()
Expand Down Expand Up @@ -238,13 +229,12 @@ impl DeleteMessageBatch {

async fn srv(
req: Request<Body>,
requests_received: Counter,
bytes_received: Counter,
metric_labels: Vec<(String, String)>,
) -> Result<Response<Body>, Error> {
requests_received.increment(1);
counter!("requests_received", &metric_labels).increment(1);

let bytes = req.collect().await?.to_bytes();
bytes_received.increment(bytes.len() as u64);
counter!("bytes_received", &metric_labels).increment(bytes.len() as u64);

let action: Action = serde_qs::from_bytes(&bytes)?;

Expand Down
9 changes: 3 additions & 6 deletions lading/src/blackhole/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ impl Tcp {

async fn handle_connection(socket: TcpStream, labels: &'static [(String, String)]) {
let mut stream = ReaderStream::new(socket);
let bytes_received = counter!("bytes_received", labels);
let message_received = counter!("message_received", labels);

while let Some(msg) = stream.next().await {
message_received.increment(1);
counter!("message_received", labels).increment(1);
if let Ok(msg) = msg {
bytes_received.increment(msg.len() as u64);
counter!("bytes_received", labels).increment(msg.len() as u64);
}
}
}
Expand All @@ -91,7 +89,6 @@ impl Tcp {
.await
.map_err(Error::Io)?;

let connection_accepted = counter!("connection_accepted", &self.metric_labels);
let labels: &'static _ = Box::new(self.metric_labels.clone()).leak();

let shutdown_wait = self.shutdown.recv();
Expand All @@ -100,7 +97,7 @@ impl Tcp {
tokio::select! {
conn = listener.accept() => {
let (socket, _) = conn.map_err(Error::Io)?;
connection_accepted.increment(1);
counter!("connection_accepted", &self.metric_labels).increment(1);
tokio::spawn(
Self::handle_connection(socket, labels)
);
Expand Down
7 changes: 2 additions & 5 deletions lading/src/blackhole/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,14 @@ impl Udp {
.map_err(Error::Io)?;
let mut buf = [0; 65536];

let bytes_received = counter!("bytes_received", &self.metric_labels);
let packet_received = counter!("packet_received", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
loop {
tokio::select! {
packet = socket.recv_from(&mut buf) => {
let (bytes, _) = packet.map_err(Error::Io)?;
packet_received.increment(1);
bytes_received.increment(bytes as u64);
counter!("packet_received", &self.metric_labels).increment(1);
counter!("bytes_received", &self.metric_labels).increment(bytes as u64);
}
() = &mut shutdown_wait => {
info!("shutdown signal received");
Expand Down
4 changes: 1 addition & 3 deletions lading/src/blackhole/unix_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,13 @@ impl UnixDatagram {
let socket = net::UnixDatagram::bind(&self.path).map_err(Error::Io)?;
let mut buf = [0; 65536];

let bytes_received = counter!("bytes_received", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
loop {
tokio::select! {
res = socket.recv(&mut buf) => {
let n: usize = res.map_err(Error::Io)?;
bytes_received.increment(n as u64);
counter!("bytes_received", &self.metric_labels).increment(n as u64);
}
() = &mut shutdown_wait => {
info!("shutdown signal received");
Expand Down
9 changes: 3 additions & 6 deletions lading/src/blackhole/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ impl UnixStream {
pub async fn run(self) -> Result<(), Error> {
let listener = net::UnixListener::bind(&self.path).map_err(Error::Io)?;

let connection_accepted = counter!("connection_accepted", &self.metric_labels);
let labels: &'static _ = Box::new(self.metric_labels.clone()).leak();

let shutdown_wait = self.shutdown.recv();
Expand All @@ -85,7 +84,7 @@ impl UnixStream {
tokio::select! {
conn = listener.accept() => {
let (socket, _) = conn.map_err(Error::Io)?;
connection_accepted.increment(1);
counter!("connection_accepted", &self.metric_labels).increment(1);
tokio::spawn(
Self::handle_connection(socket, labels)
);
Expand All @@ -100,13 +99,11 @@ impl UnixStream {

async fn handle_connection(socket: net::UnixStream, labels: &'static [(String, String)]) {
let mut stream = ReaderStream::new(socket);
let bytes_received = counter!("bytes_received", labels);
let message_received = counter!("message_received", labels);

while let Some(msg) = stream.next().await {
message_received.increment(1);
counter!("message_received", labels).increment(1);
if let Ok(msg) = msg {
bytes_received.increment(msg.len() as u64);
counter!("bytes_received", labels).increment(msg.len() as u64);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions lading/src/captures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ impl CaptureManager {
// TODO we're allocating the same small strings over and over most likely
labels.insert(lbl.key().into(), lbl.value().into());
}
let value: f64 = f64::from_bits(counter.get_inner().load(Ordering::Relaxed));
let value: u64 = counter.get_inner().load(Ordering::Relaxed);
let line = json::Line {
run_id: self.run_id,
time: now_ms,
fetch_index: self.fetch_index,
metric_name: key.name().into(),
metric_kind: json::MetricKind::Counter,
value: json::LineValue::Float(value),
value: json::LineValue::Int(value),
labels,
};
lines.push(line);
Expand Down
3 changes: 1 addition & 2 deletions lading/src/generator/file_gen/traditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ impl Child {
let (snd, rcv) = mpsc::channel(1024);
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;
let bytes_written = counter!("bytes_written");

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
Expand All @@ -270,7 +269,7 @@ impl Child {

{
fp.write_all(&blk.bytes).await?;
bytes_written.increment(total_bytes);
counter!("bytes_written").increment(total_bytes);
total_bytes_written += total_bytes;
}

Expand Down
13 changes: 4 additions & 9 deletions lading/src/generator/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,6 @@ impl Grpc {
thread::Builder::new().spawn(|| block_cache.spin(snd))?;
let rpc_path = self.rpc_path;

let requests_sent = counter!("requests_sent", &self.metric_labels);
let bytes_written = counter!("bytes_written", &self.metric_labels);
let request_ok = counter!("request_ok", &self.metric_labels);
let response_bytes = counter!("response_bytes", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
loop {
Expand All @@ -283,7 +278,7 @@ impl Grpc {
tokio::select! {
_ = self.throttle.wait_for(total_bytes) => {
let block_length = blk.bytes.len();
requests_sent.increment(1);
counter!("requests_sent", &self.metric_labels).increment(1);
let blk = rcv.next().await.expect("failed to advance through blocks"); // actually advance through the blocks
let res = Self::req(
&mut client,
Expand All @@ -294,9 +289,9 @@ impl Grpc {

match res {
Ok(res) => {
bytes_written.increment(block_length as u64);
request_ok.increment(1);
response_bytes.increment(res.into_inner() as u64);
counter!("bytes_written", &self.metric_labels).increment(block_length as u64);
counter!("request_ok", &self.metric_labels).increment(1);
counter!("response_bytes", &self.metric_labels).increment(res.into_inner() as u64);
}
Err(err) => {
let mut error_labels = self.metric_labels.clone();
Expand Down
4 changes: 1 addition & 3 deletions lading/src/generator/passthru_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ impl PassthruFile {
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;

let bytes_written = counter!("bytes_written", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
let mut current_file = None;
Expand Down Expand Up @@ -180,7 +178,7 @@ impl PassthruFile {
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
match current_file.write_all(&blk.bytes).await {
Ok(()) => {
bytes_written.increment(u64::from(blk.total_bytes.get()));
counter!("bytes_written", &self.metric_labels).increment(u64::from(blk.total_bytes.get()));
}
Err(err) => {
warn!("write failed: {}", err);
Expand Down
6 changes: 2 additions & 4 deletions lading/src/generator/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ impl Tcp {
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;

let bytes_written = counter!("bytes_written", &self.metric_labels);
let packets_sent = counter!("packets_sent", &self.metric_labels);
let mut current_connection = None;

let shutdown_wait = self.shutdown.recv();
Expand Down Expand Up @@ -183,8 +181,8 @@ impl Tcp {
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
match connection.write_all(&blk.bytes).await {
Ok(()) => {
bytes_written.increment(u64::from(blk.total_bytes.get()));
packets_sent.increment(1);
counter!("bytes_written", &self.metric_labels).increment(u64::from(blk.total_bytes.get()));
counter!("packets_sent", &self.metric_labels).increment(1);
}
Err(err) => {
trace!("write failed: {}", err);
Expand Down
7 changes: 2 additions & 5 deletions lading/src/generator/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,6 @@ impl Udp {
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;

let bytes_written = counter!("bytes_written", &self.metric_labels);
let packets_sent = counter!("packets_sent", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
loop {
Expand Down Expand Up @@ -192,8 +189,8 @@ impl Udp {
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
match sock.send_to(&blk.bytes, self.addr).await {
Ok(bytes) => {
bytes_written.increment(bytes as u64);
packets_sent.increment(1);
counter!("bytes_written", &self.metric_labels).increment(bytes as u64);
counter!("packets_sent", &self.metric_labels).increment(1);
connection = Some(sock);
}
Err(err) => {
Expand Down
6 changes: 2 additions & 4 deletions lading/src/generator/unix_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ impl Child {
let (snd, rcv) = mpsc::channel(1024);
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;
let bytes_written = counter!("bytes_written", &self.metric_labels);
let packets_sent = counter!("packets_sent", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
Expand All @@ -258,8 +256,8 @@ impl Child {
let blk = rcv.next().await.expect("failed to advance through blocks"); // actually advance through the blocks
match socket.send(&blk.bytes).await {
Ok(bytes) => {
bytes_written.increment(bytes as u64);
packets_sent.increment(1);
counter!("bytes_written", &self.metric_labels).increment(bytes as u64);
counter!("packets_sent", &self.metric_labels).increment(1);
}
Err(err) => {
debug!("write failed: {}", err);
Expand Down
Loading
Loading