Skip to content

Commit

Permalink
Add BigTable batch size and latency metrics for writes (#26699)
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloem authored May 16, 2023
1 parent 7a2c22d commit b4e785d
Showing 1 changed file with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -481,10 +484,15 @@ public Duration getOperationTimeout() {
@VisibleForTesting
static class BigtableWriterImpl implements Writer {
private Batcher<RowMutationEntry, Void> bulkMutation;
private Integer outstandingMutations = 0;
private Stopwatch stopwatch = Stopwatch.createUnstarted();
private String projectId;
private String instanceId;
private String tableId;

private Distribution bulkSize = Metrics.distribution("BigTable-" + tableId, "batchSize");
private Distribution latency = Metrics.distribution("BigTable-" + tableId, "batchLatencyMs");

BigtableWriterImpl(
BigtableDataClient client, String projectId, String instanceId, String tableId) {
this.projectId = projectId;
Expand All @@ -497,7 +505,12 @@ static class BigtableWriterImpl implements Writer {
public void flush() throws IOException {
if (bulkMutation != null) {
try {
stopwatch.start();
bulkMutation.flush();
bulkSize.update(outstandingMutations);
outstandingMutations = 0;
stopwatch.stop();
latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// We fail since flush() operation was interrupted.
Expand All @@ -510,8 +523,13 @@ public void flush() throws IOException {
public void close() throws IOException {
if (bulkMutation != null) {
try {
stopwatch.start();
bulkMutation.flush();
bulkMutation.close();
bulkSize.update(outstandingMutations);
outstandingMutations = 0;
stopwatch.stop();
latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// We fail since flush() operation was interrupted.
Expand Down Expand Up @@ -548,6 +566,7 @@ public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mu

CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();

outstandingMutations += 1;
Futures.addCallback(
new VendoredListenableFutureAdapter<>(bulkMutation.add(entry)),
new FutureCallback<MutateRowResponse>() {
Expand Down

0 comments on commit b4e785d

Please sign in to comment.