Skip to content

Commit

Permalink
Don't coerce counters to floats, avoid metrics handles
Browse files Browse the repository at this point in the history
This commit corrects a bug where counter values -- integer -- were
converted to floats leading to odd capture data. I have also adjusted
our use of metrics to always use the macro, avoiding metric handles
that might cause a metric to not expire when it should.

On this last point I have only audited the generators.

Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
blt committed Dec 12, 2024
1 parent f503e5b commit 8a2d1cd
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 31 deletions.
4 changes: 2 additions & 2 deletions lading/src/captures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ impl CaptureManager {
// TODO we're allocating the same small strings over and over most likely
labels.insert(lbl.key().into(), lbl.value().into());
}
let value: f64 = f64::from_bits(counter.get_inner().load(Ordering::Relaxed));
let value: u64 = counter.get_inner().load(Ordering::Relaxed);
let line = json::Line {
run_id: self.run_id,
time: now_ms,
fetch_index: self.fetch_index,
metric_name: key.name().into(),
metric_kind: json::MetricKind::Counter,
value: json::LineValue::Float(value),
value: json::LineValue::Int(value),
labels,
};
lines.push(line);
Expand Down
3 changes: 1 addition & 2 deletions lading/src/generator/file_gen/traditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ impl Child {
let (snd, rcv) = mpsc::channel(1024);
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;
let bytes_written = counter!("bytes_written");

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
Expand All @@ -270,7 +269,7 @@ impl Child {

{
fp.write_all(&blk.bytes).await?;
bytes_written.increment(total_bytes);
counter!("bytes_written").increment(total_bytes);
total_bytes_written += total_bytes;
}

Expand Down
13 changes: 4 additions & 9 deletions lading/src/generator/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,6 @@ impl Grpc {
thread::Builder::new().spawn(|| block_cache.spin(snd))?;
let rpc_path = self.rpc_path;

let requests_sent = counter!("requests_sent", &self.metric_labels);
let bytes_written = counter!("bytes_written", &self.metric_labels);
let request_ok = counter!("request_ok", &self.metric_labels);
let response_bytes = counter!("response_bytes", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
loop {
Expand All @@ -283,7 +278,7 @@ impl Grpc {
tokio::select! {
_ = self.throttle.wait_for(total_bytes) => {
let block_length = blk.bytes.len();
requests_sent.increment(1);
counter!("requests_sent", &self.metric_labels).increment(1);
let blk = rcv.next().await.expect("failed to advance through blocks"); // actually advance through the blocks
let res = Self::req(
&mut client,
Expand All @@ -294,9 +289,9 @@ impl Grpc {

match res {
Ok(res) => {
bytes_written.increment(block_length as u64);
request_ok.increment(1);
response_bytes.increment(res.into_inner() as u64);
counter!("bytes_written", &self.metric_labels).increment(block_length as u64);
counter!("request_ok", &self.metric_labels).increment(1);
counter!("response_bytes", &self.metric_labels).increment(res.into_inner() as u64);
}
Err(err) => {
let mut error_labels = self.metric_labels.clone();
Expand Down
6 changes: 2 additions & 4 deletions lading/src/generator/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ impl Tcp {
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;

let bytes_written = counter!("bytes_written", &self.metric_labels);
let packets_sent = counter!("packets_sent", &self.metric_labels);
let mut current_connection = None;

let shutdown_wait = self.shutdown.recv();
Expand Down Expand Up @@ -183,8 +181,8 @@ impl Tcp {
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
match connection.write_all(&blk.bytes).await {
Ok(()) => {
bytes_written.increment(u64::from(blk.total_bytes.get()));
packets_sent.increment(1);
counter!("bytes_written", &self.metric_labels).increment(u64::from(blk.total_bytes.get()));
counter!("packets_sent", &self.metric_labels).increment(1);
}
Err(err) => {
trace!("write failed: {}", err);
Expand Down
7 changes: 2 additions & 5 deletions lading/src/generator/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,6 @@ impl Udp {
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;

let bytes_written = counter!("bytes_written", &self.metric_labels);
let packets_sent = counter!("packets_sent", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
loop {
Expand Down Expand Up @@ -192,8 +189,8 @@ impl Udp {
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
match sock.send_to(&blk.bytes, self.addr).await {
Ok(bytes) => {
bytes_written.increment(bytes as u64);
packets_sent.increment(1);
counter!("bytes_written", &self.metric_labels).increment(bytes as u64);
counter!("packets_sent", &self.metric_labels).increment(1);
connection = Some(sock);
}
Err(err) => {
Expand Down
6 changes: 2 additions & 4 deletions lading/src/generator/unix_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ impl Child {
let (snd, rcv) = mpsc::channel(1024);
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;
let bytes_written = counter!("bytes_written", &self.metric_labels);
let packets_sent = counter!("packets_sent", &self.metric_labels);

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
Expand All @@ -258,8 +256,8 @@ impl Child {
let blk = rcv.next().await.expect("failed to advance through blocks"); // actually advance through the blocks
match socket.send(&blk.bytes).await {
Ok(bytes) => {
bytes_written.increment(bytes as u64);
packets_sent.increment(1);
counter!("bytes_written", &self.metric_labels).increment(bytes as u64);
counter!("packets_sent", &self.metric_labels).increment(1);
}
Err(err) => {
debug!("write failed: {}", err);
Expand Down
7 changes: 2 additions & 5 deletions lading/src/generator/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ impl UnixStream {
let mut rcv: PeekableReceiver<Block> = PeekableReceiver::new(rcv);
thread::Builder::new().spawn(|| block_cache.spin(snd))?;

let bytes_written = counter!("bytes_written", &self.metric_labels);
let packets_sent = counter!("packets_sent", &self.metric_labels);

let mut current_connection = None;

let shutdown_wait = self.shutdown.recv();
Expand Down Expand Up @@ -206,8 +203,8 @@ impl UnixStream {
// if the readiness event is a false positive.
match stream.try_write(&blk.bytes[blk_offset..]) {
Ok(bytes) => {
bytes_written.increment(bytes as u64);
packets_sent.increment(1);
counter!("bytes_written", &self.metric_labels).increment(bytes as u64);
counter!("packets_sent", &self.metric_labels).increment(1);
blk_offset += bytes;
}
Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => {
Expand Down

0 comments on commit 8a2d1cd

Please sign in to comment.