diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index dbedc51528a5..dd5b2da34ae5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -27,6 +27,7 @@ import com.google.api.services.dataflow.model.IntegerGauge; import com.google.api.services.dataflow.model.StringList; import java.util.ArrayList; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.MetricKey; @@ -62,7 +63,8 @@ public enum Kind { MEAN("MEAN"), SUM("SUM"), LATEST_VALUE("LATEST_VALUE"), - SET("SET"); + SET("SET"), + TRIE("TRIE"); private final String kind; @@ -110,6 +112,16 @@ public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSet .setStringList(stringList); } + public static CounterUpdate fromBoundedTrie(MetricKey key, BoundedTrieData boundedTrieData) { + CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.TRIE); + + // TODO: Test this with sandbox. + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(false) + .set(Kind.TRIE.toString(), boundedTrieData.toProto()); + } + public static CounterUpdate fromDistribution( MetricKey key, boolean isCumulative, DistributionData update) { CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.DISTRIBUTION); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 7c5aadefb9ac..a1ad1357a074 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -197,7 +197,7 @@ public Histogram getPerWorkerHistogram( public Iterable extractUpdates() { return counterUpdates() .append(distributionUpdates()) - .append(gaugeUpdates().append(stringSetUpdates())); + .append(gaugeUpdates().append(stringSetUpdates()).append(boundedTrieUpdates())); } private FluentIterable counterUpdates() { @@ -253,6 +253,20 @@ private FluentIterable stringSetUpdates() { .filter(Predicates.notNull()); } + private FluentIterable boundedTrieUpdates() { + return FluentIterable.from(boundedTries.entries()) + .transform( + new Function, CounterUpdate>() { + @Override + public @Nullable CounterUpdate apply( + @Nonnull Map.Entry entry) { + return MetricsToCounterUpdateConverter.fromBoundedTrie( + MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative()); + } + }) + .filter(Predicates.notNull()); + } + private FluentIterable distributionUpdates() { return FluentIterable.from(distributions.entries()) .transform( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 37c5ad261280..b34d657cb55a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -50,8 +50,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; @@ -61,6 +63,7 @@ import org.apache.beam.sdk.metrics.NoOpHistogram; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.hamcrest.collection.IsEmptyIterable; import org.hamcrest.collection.IsMapContaining; @@ -325,6 +328,72 @@ public void testStringSetUpdateExtraction() { assertThat(updates, containsInAnyOrder(name1Update, name2Update)); } + @Test + public void testBoundedTrieUpdateExtraction() { + BoundedTrie boundedTrie = c1.getBoundedTrie(name1); + boundedTrie.add("ab"); + boundedTrie.add("cd", "ef"); + boundedTrie.add("gh"); + boundedTrie.add("gh"); + + BoundedTrieData expectedName1 = new BoundedTrieData(); + expectedName1.add(ImmutableList.of("ab")); + expectedName1.add(ImmutableList.of("cd", "ef")); + expectedName1.add(ImmutableList.of("gh")); + expectedName1.add(ImmutableList.of("gh")); + + CounterUpdate name1Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name1") + .setOriginalStepName("s1")) + .setMetadata(new CounterMetadata().setKind(Kind.TRIE.toString()))) + .setCumulative(false) + .set(Kind.TRIE.toString(), expectedName1.toProto()); + + Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update)); + + boundedTrie = c2.getBoundedTrie(name2); + boundedTrie.add("ij"); + boundedTrie.add("kl", "mn"); + boundedTrie.add("mn"); + + BoundedTrieData expectedName2 = new BoundedTrieData(); + expectedName2.add(ImmutableList.of("ij")); + expectedName2.add(ImmutableList.of("kl", "mn")); + expectedName2.add(ImmutableList.of("mn")); + + CounterUpdate name2Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name2") + .setOriginalStepName("s2")) + .setMetadata(new CounterMetadata().setKind(Kind.TRIE.toString()))) + .setCumulative(false) + .set(Kind.TRIE.toString(), expectedName2.toProto()); + + updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + + c1.getBoundedTrie(name1).add("op"); + expectedName1.add(ImmutableList.of("op")); + name1Update.set(Kind.TRIE.toString(), expectedName1.toProto()); + + updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + } + @Test public void testPerWorkerMetrics() { StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false);