Skip to content

Commit

Permalink
Thread stats (#85)
Browse files Browse the repository at this point in the history
Add more detailed thread and GC stats to summary.
  • Loading branch information
msbarry authored Feb 24, 2022
1 parent aaaaa2f commit 0d727c4
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import javax.management.NotificationEmitter;
import javax.management.openmbean.CompositeData;
Expand Down Expand Up @@ -109,9 +110,37 @@ public record ThreadState(
String name, Duration cpuTime, Duration userTime, Duration waiting, Duration blocking, long id
) {

private static long zeroIfUnsupported(LongSupplier supplier) {
try {
return supplier.getAsLong();
} catch (UnsupportedOperationException e) {
return 0;
}
}

public ThreadState(ThreadMXBean threadMXBean, ThreadInfo thread) {
this(
thread.getThreadName(),
Duration.ofNanos(zeroIfUnsupported(() -> threadMXBean.getThreadCpuTime(thread.getThreadId()))),
Duration.ofNanos(zeroIfUnsupported(() -> threadMXBean.getThreadUserTime(thread.getThreadId()))),
Duration.ofMillis(zeroIfUnsupported(thread::getWaitedTime)),
Duration.ofMillis(zeroIfUnsupported(thread::getBlockedTime)),
thread.getThreadId());
}

public static final ThreadState DEFAULT = new ThreadState("", Duration.ZERO, Duration.ZERO, Duration.ZERO,
Duration.ZERO, -1);

/** Adds up the timers in two {@code ThreadState} instances */
public ThreadState plus(ThreadState other) {
return new ThreadState("<multiple threads>",
cpuTime.plus(other.cpuTime),
userTime.plus(other.userTime),
waiting.plus(other.waiting),
blocking.plus(other.blocking),
-1
);
}
}

/** Returns the amount of time this JVM has spent in any kind of garbage collection since startup. */
Expand Down Expand Up @@ -143,16 +172,14 @@ public static Map<Long, ThreadState> getThreadStats() {
Map<Long, ThreadState> threadState = new TreeMap<>();
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
for (ThreadInfo thread : threadMXBean.dumpAllThreads(false, false)) {
threadState.put(thread.getThreadId(),
new ThreadState(
thread.getThreadName(),
Duration.ofNanos(threadMXBean.getThreadCpuTime(thread.getThreadId())),
Duration.ofNanos(threadMXBean.getThreadUserTime(thread.getThreadId())),
Duration.ofMillis(thread.getWaitedTime()),
Duration.ofMillis(thread.getBlockedTime()),
thread.getThreadId()
));
threadState.put(thread.getThreadId(), new ThreadState(threadMXBean, thread));
}
return threadState;
}

public static ThreadState getCurrentThreadState() {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo thread = threadMXBean.getThreadInfo(Thread.currentThread().getId());
return new ThreadState(threadMXBean, thread);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,29 @@
* LOGGER.log("Expensive work took " + end.minus(start));
* }</pre>
*/
public record ProcessTime(Duration wall, Optional<Duration> cpu) {
public record ProcessTime(Duration wall, Optional<Duration> cpu, Duration gc) {

/** Takes a snapshot of current wall and CPU time of this JVM. */
public static ProcessTime now() {
return new ProcessTime(Duration.ofNanos(System.nanoTime()), ProcessInfo.getProcessCpuTime());
return new ProcessTime(Duration.ofNanos(System.nanoTime()), ProcessInfo.getProcessCpuTime(),
ProcessInfo.getGcTime());
}

/** Returns the amount of time elapsed between {@code other} and {@code this}. */
ProcessTime minus(ProcessTime other) {
return new ProcessTime(wall.minus(other.wall), cpu.flatMap(thisCpu -> other.cpu.map(thisCpu::minus)));
return new ProcessTime(
wall.minus(other.wall),
cpu.flatMap(thisCpu -> other.cpu.map(thisCpu::minus)),
gc.minus(other.gc)
);
}

public String toString(Locale locale) {
Format format = Format.forLocale(locale);
Optional<String> deltaCpu = cpu.map(format::seconds);
Optional<String> deltaCpu = cpu.map(format::duration);
String avgCpus = cpu.map(cpuTime -> " avg:" + format.decimal(cpuTime.toNanos() * 1d / wall.toNanos()))
.orElse("");
return format.seconds(wall) + " cpu:" + deltaCpu.orElse("-") + avgCpus;
return format.duration(wall) + " cpu:" + deltaCpu.orElse("-") + " gc:" + format.duration(gc) + avgCpus;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public List<MetricFamilySamples> collect() {
List<MetricFamilySamples> result = new ArrayList<>();
for (var entry : timers.all().entrySet()) {
String name = entry.getKey();
Timer timer = entry.getValue();
Timer timer = entry.getValue().timer();
result.add(gaugeMetric(name + "_running", timer.running() ? 1 : 0));
ProcessTime time = timer.elapsed();
result.add(gaugeMetric(name + "_elapsed_time_seconds", time.wall().toNanos() / NANOSECONDS_PER_SECOND));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static Stats prometheusPushGateway(String destination, String job, Duration inte
default void printSummary() {
Format format = Format.defaultInstance();
Logger LOGGER = LoggerFactory.getLogger(getClass());
System.out.println();
LOGGER.info("-".repeat(40));
timers().printSummary();
LOGGER.info("-".repeat(40));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package com.onthegomap.planetiler.stats;

import com.onthegomap.planetiler.util.Format;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,23 +20,99 @@
public class Timers {

private static final Logger LOGGER = LoggerFactory.getLogger(Stats.InMemory.class);
private final Map<String, Timer> timers = Collections.synchronizedMap(new LinkedHashMap<>());
private static final Format FORMAT = Format.defaultInstance();
private final Map<String, Stage> timers = Collections.synchronizedMap(new LinkedHashMap<>());
private final AtomicReference<Stage> currentStage = new AtomicReference<>();

public void printSummary() {
int maxLength = (int) all().keySet().stream().mapToLong(String::length).max().orElse(0);
for (var entry : all().entrySet()) {
LOGGER.info("\t" + entry.getKey() + "\t" + entry.getValue().elapsed());
String name = entry.getKey();
var elapsed = entry.getValue().timer.elapsed();
LOGGER.info("\t" + Format.padRight(name, maxLength) + " " + elapsed);
for (String detail : getStageDetails(name, true)) {
LOGGER.info("\t " + detail);
}
}
}

private List<String> getStageDetails(String name, boolean pad) {
List<String> resultList = new ArrayList<>();
Stage stage = timers.get(name);
var elapsed = stage.timer.elapsed();
List<String> threads = stage.threadStats.stream().map(d -> d.prefix).distinct().toList();
int maxLength = !pad ? 0 : (int) (threads.stream()
.map(n -> n.replace(name + "_", ""))
.mapToLong(String::length)
.max().orElse(0)) + 1;
for (String thread : threads) {
StringBuilder result = new StringBuilder();
List<ThreadInfo> threadStates = stage.threadStats.stream()
.filter(t -> t.prefix.equals(thread))
.toList();
int num = threadStates.size();
ProcessInfo.ThreadState sum = threadStates.stream()
.map(d -> d.state)
.reduce(ProcessInfo.ThreadState.DEFAULT, ProcessInfo.ThreadState::plus);
double totalNanos = elapsed.wall().multipliedBy(num).toNanos();
result.append(Format.padRight(thread.replace(name + "_", ""), maxLength))
.append(Format.padLeft(Integer.toString(num), 2))
.append("x(")
.append(FORMAT.percent(sum.cpuTime().toNanos() / totalNanos))
.append(" cpu:")
.append(FORMAT.duration(sum.cpuTime().dividedBy(num)));

Duration systemTime = sum.cpuTime().minus(sum.userTime()).dividedBy(num);
if (systemTime.compareTo(Duration.ofSeconds(1)) > 0) {
result.append(" sys:").append(FORMAT.duration(systemTime));
}
Duration blockTime = sum.blocking().dividedBy(num);
if (blockTime.compareTo(Duration.ofSeconds(1)) > 0) {
result.append(" block:").append(FORMAT.duration(blockTime));
}
Duration waitTime = sum.waiting().dividedBy(num);
if (waitTime.compareTo(Duration.ofSeconds(1)) > 0) {
result.append(" wait:").append(FORMAT.duration(waitTime));
}
Duration totalThreadElapsedTime = threadStates.stream().map(d -> d.elapsed)
.reduce(Duration::plus)
.orElse(Duration.ZERO)
.dividedBy(num);
Duration doneTime = elapsed.wall().minus(totalThreadElapsedTime);
if (doneTime.compareTo(Duration.ofSeconds(1)) > 0) {
result.append(" done:").append(FORMAT.duration(doneTime));
}
result.append(")");
resultList.add(result.toString());
}
return resultList;
}

public Finishable startTimer(String name) {
Timer timer = Timer.start();
timers.put(name, timer);
Stage stage = new Stage(timer);
timers.put(name, stage);
Stage last = currentStage.getAndSet(stage);
System.out.println();
LOGGER.info("Starting...");
return () -> LOGGER.info("Finished in " + timers.get(name).stop() + System.lineSeparator());
return () -> {
LOGGER.info("Finished in " + timers.get(name).timer.stop());
for (var details : getStageDetails(name, true)) {
LOGGER.info(" " + details);
}
currentStage.set(last);
};
}

public void finishedWorker(String prefix, Duration elapsed) {
Stage stage = currentStage.get();
if (stage != null) {
stage.threadStats.add(new ThreadInfo(ProcessInfo.getCurrentThreadState(), prefix, elapsed));
}
}

/** Returns a snapshot of all timers currently running. Will not reflect timers that start after it's called. */
public Map<String, Timer> all() {
public Map<String, Stage> all() {
synchronized (timers) {
return new LinkedHashMap<>(timers);
}
Expand All @@ -41,4 +123,13 @@ public interface Finishable {

void stop();
}

record ThreadInfo(ProcessInfo.ThreadState state, String prefix, Duration elapsed) {}

record Stage(Timer timer, List<ThreadInfo> threadStats) {

Stage(Timer timer) {
this(timer, new CopyOnWriteArrayList<>());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ public String seconds(Duration duration) {
return decimal(seconds < 1 ? seconds : Math.round(seconds)) + "s";
}

/** Returns a duration formatted like "1h2m" or "2m3s". */
public String duration(Duration duration) {
Duration simplified;
double seconds = duration.toNanos() * 1d / Duration.ofSeconds(1).toNanos();
if (seconds < 1) {
return decimal(seconds) + "s";
} else {
simplified = Duration.ofSeconds(Math.round(seconds));
}
return simplified.toString().replace("PT", "").toLowerCase(Locale.ROOT);
}

/** Returns Java code that can re-create {@code string}: {@code null} if null, or {@code "contents"} if not empty. */
public static String quote(String string) {
if (string == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public Worker(String prefix, Stats stats, int threads, RunnableThatThrows task)
String id = Thread.currentThread().getName();
LOGGER.trace("Starting worker");
try {
long start = System.nanoTime();
task.run();
stats.timers().finishedWorker(prefix, Duration.ofNanos(System.nanoTime() - start));
} catch (Throwable e) {
System.err.println("Worker " + id + " died");
throwRuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.onthegomap.planetiler.stats;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import org.junit.jupiter.api.Test;

public class ProcessInfoTest {
Expand All @@ -21,4 +23,17 @@ public void testCPU() {
public void testThreads() {
assertFalse(ProcessInfo.getThreadStats().isEmpty());
}

@Test
public void testAdd() {
var a = new ProcessInfo.ThreadState("", Duration.ofSeconds(1), Duration.ofSeconds(2), Duration.ofSeconds(3),
Duration.ofSeconds(4), -1);
var b = new ProcessInfo.ThreadState("", Duration.ofSeconds(5), Duration.ofSeconds(6), Duration.ofSeconds(7),
Duration.ofSeconds(8), -1);
var sum = a.plus(b);
assertEquals(Duration.ofSeconds(6), sum.cpuTime());
assertEquals(Duration.ofSeconds(8), sum.userTime());
assertEquals(Duration.ofSeconds(10), sum.waiting());
assertEquals(Duration.ofSeconds(12), sum.blocking());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testTimer() {
assertContainsStat("^planetiler_task1_elapsed_time_seconds [0-9\\.]+$", stats);
assertContainsStat("^planetiler_task1_cpu_time_seconds [0-9\\.]+$", stats);

assertFalse(stats.timers().all().get("task1").running());
assertFalse(stats.timers().all().get("task1").timer().running());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public void testTimers() {
timers.printSummary();

var finish = timers.startTimer("task2");
assertTrue(timers.all().get("task2").running());
assertTrue(timers.all().get("task2").timer().running());
finish.stop();
assertFalse(timers.all().get("task2").running());
assertFalse(timers.all().get("task2").timer().running());

timers.printSummary();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.onthegomap.planetiler.util;

import static io.prometheus.client.Collector.NANOSECONDS_PER_SECOND;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.Locale;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
Expand Down Expand Up @@ -73,4 +75,22 @@ public void testPad(String in, Integer size, String out) {
public void testFormatDecimal(Double in, String out, Locale locale) {
assertEquals(out, Format.forLocale(locale).decimal(in));
}

@ParameterizedTest
@CsvSource({
"0,0s,en",
"0.1,0.1s,en",
"0.1,'0,1s',it",
"0.999,1s,en",
"1.1,1s,en",
"59,59s,en",
"60,1m,en",
"61.1,1m1s,en",
"3599,59m59s,en",
"3600,1h,en",
"3601,1h1s,en",
})
public void testFormatDuration(double seconds, String out, Locale locale) {
assertEquals(out, Format.forLocale(locale).duration(Duration.ofNanos((long) (seconds * NANOSECONDS_PER_SECOND))));
}
}

0 comments on commit 0d727c4

Please sign in to comment.