diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 9c64d2a56abe..540a3b5e1ac3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -66,9 +66,12 @@ import org.apache.beam.runners.core.metrics.ServiceCallMetric; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.io.range.ByteKeyRange; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.FutureCallback; @@ -481,10 +484,15 @@ public Duration getOperationTimeout() { @VisibleForTesting static class BigtableWriterImpl implements Writer { private Batcher bulkMutation; + private Integer outstandingMutations = 0; + private Stopwatch stopwatch = Stopwatch.createUnstarted(); private String projectId; private String instanceId; private String tableId; + private Distribution bulkSize = Metrics.distribution("BigTable-" + tableId, "batchSize"); + private Distribution latency = Metrics.distribution("BigTable-" + tableId, "batchLatencyMs"); + BigtableWriterImpl( BigtableDataClient client, String projectId, String instanceId, String tableId) { this.projectId = projectId; @@ -497,7 +505,12 @@ static class BigtableWriterImpl implements Writer { public void flush() throws IOException { if (bulkMutation != null) { try { + stopwatch.start(); bulkMutation.flush(); + bulkSize.update(outstandingMutations); + outstandingMutations = 0; + stopwatch.stop(); + latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // We fail since flush() operation was interrupted. @@ -510,8 +523,13 @@ public void flush() throws IOException { public void close() throws IOException { if (bulkMutation != null) { try { + stopwatch.start(); bulkMutation.flush(); bulkMutation.close(); + bulkSize.update(outstandingMutations); + outstandingMutations = 0; + stopwatch.stop(); + latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // We fail since flush() operation was interrupted. @@ -548,6 +566,7 @@ public CompletionStage writeRecord(KV result = new CompletableFuture<>(); + outstandingMutations += 1; Futures.addCallback( new VendoredListenableFutureAdapter<>(bulkMutation.add(entry)), new FutureCallback() {