Skip to content

Commit

Permalink
finish audit
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
blt committed Dec 12, 2024
1 parent 8a2d1cd commit eb38edd
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 59 deletions.
17 changes: 6 additions & 11 deletions lading/src/blackhole/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
};
use metrics::{counter, Counter};
use metrics::counter;
use serde::{Deserialize, Serialize};
use tower::ServiceBuilder;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -127,14 +127,13 @@ struct KinesisPutRecordBatchResponse {
#[allow(clippy::borrow_interior_mutable_const)]
async fn srv(
status: StatusCode,
bytes_received: Counter,
requests_received: Counter,
metric_labels: Vec<(String, String)>,
body_bytes: Vec<u8>,
req: Request<Body>,
headers: HeaderMap,
response_delay: Duration,
) -> Result<Response<Body>, hyper::Error> {
requests_received.increment(1);
counter!("requests_received", &metric_labels).increment(1);

let (parts, body) = req.into_parts();

Expand All @@ -143,7 +142,7 @@ async fn srv(
match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) {
Err(response) => Ok(response),
Ok(body) => {
bytes_received.increment(body.len() as u64);
counter!("bytes_received", &metric_labels).increment(body.len() as u64);

tokio::time::sleep(response_delay).await;

Expand Down Expand Up @@ -234,20 +233,16 @@ impl Http {
/// Function will return an error if the configuration is invalid or if
/// receiving a packet fails.
pub async fn run(self) -> Result<(), Error> {
let bytes_received = counter!("bytes_received", &self.metric_labels);
let requests_received = counter!("requests_received", &self.metric_labels);
let service = make_service_fn(|_: &AddrStream| {
let bytes_received = bytes_received.clone();
let requests_received = requests_received.clone();
let metric_labels = self.metric_labels.clone();
let body_bytes = self.body_bytes.clone();
let headers = self.headers.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |request| {
debug!("REQUEST: {:?}", request);
srv(
self.status,
bytes_received.clone(),
requests_received.clone(),
metric_labels.clone(),
body_bytes.clone(),
request,
headers.clone(),
Expand Down
5 changes: 3 additions & 2 deletions lading/src/blackhole/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server, StatusCode,
};
use metrics::counter;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use tower::ServiceBuilder;
Expand Down Expand Up @@ -91,15 +92,15 @@ async fn srv(
req: Request<Body>,
labels: Arc<Vec<(String, String)>>,
) -> Result<Response<Body>, Error> {
metrics::counter!("requests_received", &*labels).increment(1);
counter!("requests_received", &*labels).increment(1);

let (parts, body) = req.into_parts();
let bytes = body.collect().await?.to_bytes();

match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) {
Err(response) => Ok(response),
Ok(body) => {
metrics::counter!("bytes_received", &*labels).increment(body.len() as u64);
counter!("bytes_received", &*labels).increment(body.len() as u64);

let mut okay = Response::default();
*okay.status_mut() = StatusCode::OK;
Expand Down
22 changes: 6 additions & 16 deletions lading/src/blackhole/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
};
use metrics::{counter, Counter};
use metrics::counter;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use serde::{Deserialize, Serialize};
use tokio::time::Duration;
Expand Down Expand Up @@ -97,18 +97,9 @@ impl Sqs {
///
/// None known.
pub async fn run(self) -> Result<(), Error> {
let bytes_received = counter!("bytes_received", &self.metric_labels);
let requests_received = counter!("requests_received", &self.metric_labels);
let service = make_service_fn(|_: &AddrStream| {
let bytes_received = bytes_received.clone();
let requests_received = requests_received.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |req| {
let bytes_received = bytes_received.clone();
let requests_received = requests_received.clone();
srv(req, requests_received, bytes_received)
}))
}
let metric_labels = self.metric_labels.clone();
async move { Ok::<_, hyper::Error>(service_fn(move |req| srv(req, metric_labels.clone()))) }
});
let svc = ServiceBuilder::new()
.load_shed()
Expand Down Expand Up @@ -238,13 +229,12 @@ impl DeleteMessageBatch {

async fn srv(
req: Request<Body>,
requests_received: Counter,
bytes_received: Counter,
metric_labels: Vec<(String, String)>,
) -> Result<Response<Body>, Error> {
requests_received.increment(1);
counter!("requests_received", &metric_labels).increment(1);

let bytes = req.collect().await?.to_bytes();
bytes_received.increment(bytes.len() as u64);
counter!("bytes_received", &metric_labels).increment(bytes.len() as u64);

let action: Action = serde_qs::from_bytes(&bytes)?;

Expand Down
9 changes: 3 additions & 6 deletions lading/src/blackhole/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ impl Tcp {

async fn handle_connection(socket: TcpStream, labels: &'static [(String, String)]) {
let mut stream = ReaderStream::new(socket);
let bytes_received = counter!("bytes_received", labels);
let message_received = counter!("message_received", labels);

while let Some(msg) = stream.next().await {
message_received.increment(1);
counter!("message_received", labels).increment(1);
if let Ok(msg) = msg {
bytes_received.increment(msg.len() as u64);
counter!("bytes_received", labels).increment(msg.len() as u64);
}
}
}
Expand All @@ -91,7 +89,6 @@ impl Tcp {
.await
.map_err(Error::Io)?;

let connection_accepted = counter!("connection_accepted", &self.metric_labels);
let labels: &'static _ = Box::new(self.metric_labels.clone()).leak();

let shutdown_wait = self.shutdown.recv();
Expand All @@ -100,7 +97,7 @@ impl Tcp {
tokio::select! {
conn = listener.accept() => {
let (socket, _) = conn.map_err(Error::Io)?;
connection_accepted.increment(1);
counter!("connection_accepted", &self.metric_labels).increment(1);
tokio::spawn(
Self::handle_connection(socket, labels)
);
Expand Down
7 changes: 2 additions & 5 deletions lading/src/blackhole/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,14 @@ impl Udp {
.map_err(Error::Io)?;
let mut buf = [0; 65536];

let bytes_received = counter!("bytes_received", &self.metric_labels);
let packet_received = counter!("packet_received", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
loop {
tokio::select! {
packet = socket.recv_from(&mut buf) => {
let (bytes, _) = packet.map_err(Error::Io)?;
packet_received.increment(1);
bytes_received.increment(bytes as u64);
counter!("packet_received", &self.metric_labels).increment(1);
counter!("bytes_received", &self.metric_labels).increment(bytes as u64);
}
() = &mut shutdown_wait => {
info!("shutdown signal received");
Expand Down
4 changes: 1 addition & 3 deletions lading/src/blackhole/unix_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,13 @@ impl UnixDatagram {
let socket = net::UnixDatagram::bind(&self.path).map_err(Error::Io)?;
let mut buf = [0; 65536];

let bytes_received = counter!("bytes_received", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
loop {
tokio::select! {
res = socket.recv(&mut buf) => {
let n: usize = res.map_err(Error::Io)?;
bytes_received.increment(n as u64);
counter!("bytes_received", &self.metric_labels).increment(n as u64);
}
() = &mut shutdown_wait => {
info!("shutdown signal received");
Expand Down
9 changes: 3 additions & 6 deletions lading/src/blackhole/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ impl UnixStream {
pub async fn run(self) -> Result<(), Error> {
let listener = net::UnixListener::bind(&self.path).map_err(Error::Io)?;

let connection_accepted = counter!("connection_accepted", &self.metric_labels);
let labels: &'static _ = Box::new(self.metric_labels.clone()).leak();

let shutdown_wait = self.shutdown.recv();
Expand All @@ -85,7 +84,7 @@ impl UnixStream {
tokio::select! {
conn = listener.accept() => {
let (socket, _) = conn.map_err(Error::Io)?;
connection_accepted.increment(1);
counter!("connection_accepted", &self.metric_labels).increment(1);
tokio::spawn(
Self::handle_connection(socket, labels)
);
Expand All @@ -100,13 +99,11 @@ impl UnixStream {

async fn handle_connection(socket: net::UnixStream, labels: &'static [(String, String)]) {
let mut stream = ReaderStream::new(socket);
let bytes_received = counter!("bytes_received", labels);
let message_received = counter!("message_received", labels);

while let Some(msg) = stream.next().await {
message_received.increment(1);
counter!("message_received", labels).increment(1);
if let Ok(msg) = msg {
bytes_received.increment(msg.len() as u64);
counter!("bytes_received", labels).increment(msg.len() as u64);
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions lading/src/generator/passthru_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ impl PassthruFile {
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;

let bytes_written = counter!("bytes_written", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
let mut current_file = None;
Expand Down Expand Up @@ -180,7 +178,7 @@ impl PassthruFile {
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
match current_file.write_all(&blk.bytes).await {
Ok(()) => {
bytes_written.increment(u64::from(blk.total_bytes.get()));
counter!("bytes_written", &self.metric_labels).increment(u64::from(blk.total_bytes.get()));
}
Err(err) => {
warn!("write failed: {}", err);
Expand Down
6 changes: 3 additions & 3 deletions lading/src/target_metrics/expvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ impl Expvar {
let val = json.pointer(var_name).and_then(serde_json::Value::as_f64);
if let Some(val) = val {
trace!("expvar: {} = {}", var_name, val);
let handle = gauge!(
gauge!(
format!("target/{name}", name = var_name.trim_start_matches('/'),),
&all_labels
);
handle.set(val);
)
.set(val);
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions lading/src/target_metrics/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ pub(crate) async fn scrape_metrics(
}
};

let handle = gauge!(format!("target/{name}"), &all_labels.unwrap_or_default());
handle.set(value);
gauge!(format!("target/{name}"), &all_labels.unwrap_or_default()).set(value);
}
Some(MetricType::Counter) => {
let value: f64 = match value.parse() {
Expand All @@ -296,8 +295,7 @@ pub(crate) async fn scrape_metrics(
};

trace!("counter: {name} = {value}");
let handle = counter!(format!("target/{name}"), &all_labels.unwrap_or_default());
handle.absolute(value);
counter!(format!("target/{name}"), &all_labels.unwrap_or_default()).absolute(value);
}
Some(_) => {
trace!("unsupported metric type: {name} = {value}");
Expand Down

0 comments on commit eb38edd

Please sign in to comment.