Skip to content

Commit

Permalink
Plumb BoundedTrie to StreamingStepMetricsContainer
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitsinha54 committed Dec 22, 2024
1 parent bfe3de0 commit 4b0b070
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.dataflow.model.IntegerGauge;
import com.google.api.services.dataflow.model.StringList;
import java.util.ArrayList;
import org.apache.beam.runners.core.metrics.BoundedTrieData;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.MetricKey;
Expand Down Expand Up @@ -62,7 +63,8 @@ public enum Kind {
MEAN("MEAN"),
SUM("SUM"),
LATEST_VALUE("LATEST_VALUE"),
SET("SET");
SET("SET"),
TRIE("TRIE");

private final String kind;

Expand Down Expand Up @@ -110,6 +112,16 @@ public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSet
.setStringList(stringList);
}

public static CounterUpdate fromBoundedTrie(MetricKey key, BoundedTrieData boundedTrieData) {
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.TRIE);

// TODO: Test this with sandbox.
return new CounterUpdate()
.setStructuredNameAndMetadata(name)
.setCumulative(false)
.set(Kind.TRIE.toString(), boundedTrieData.toProto());
}

public static CounterUpdate fromDistribution(
MetricKey key, boolean isCumulative, DistributionData update) {
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.DISTRIBUTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Histogram getPerWorkerHistogram(
public Iterable<CounterUpdate> extractUpdates() {
return counterUpdates()
.append(distributionUpdates())
.append(gaugeUpdates().append(stringSetUpdates()));
.append(gaugeUpdates().append(stringSetUpdates()).append(boundedTrieUpdates()));
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down Expand Up @@ -253,6 +253,20 @@ private FluentIterable<CounterUpdate> stringSetUpdates() {
.filter(Predicates.notNull());
}

private FluentIterable<CounterUpdate> boundedTrieUpdates() {
return FluentIterable.from(boundedTries.entries())
.transform(
new Function<Entry<MetricName, BoundedTrieCell>, CounterUpdate>() {
@Override
public @Nullable CounterUpdate apply(
@Nonnull Map.Entry<MetricName, BoundedTrieCell> entry) {
return MetricsToCounterUpdateConverter.fromBoundedTrie(
MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative());
}
})
.filter(Predicates.notNull());
}

private FluentIterable<CounterUpdate> distributionUpdates() {
return FluentIterable.from(distributions.entries())
.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.runners.core.metrics.BoundedTrieData;
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin;
import org.apache.beam.sdk.metrics.BoundedTrie;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
Expand All @@ -61,6 +63,7 @@
import org.apache.beam.sdk.metrics.NoOpHistogram;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.hamcrest.collection.IsEmptyIterable;
import org.hamcrest.collection.IsMapContaining;
Expand Down Expand Up @@ -325,6 +328,72 @@ public void testStringSetUpdateExtraction() {
assertThat(updates, containsInAnyOrder(name1Update, name2Update));
}

@Test
public void testBoundedTrieUpdateExtraction() {
BoundedTrie boundedTrie = c1.getBoundedTrie(name1);
boundedTrie.add("ab");
boundedTrie.add("cd", "ef");
boundedTrie.add("gh");
boundedTrie.add("gh");

BoundedTrieData expectedName1 = new BoundedTrieData();
expectedName1.add(ImmutableList.of("ab"));
expectedName1.add(ImmutableList.of("cd", "ef"));
expectedName1.add(ImmutableList.of("gh"));
expectedName1.add(ImmutableList.of("gh"));

CounterUpdate name1Update =
new CounterUpdate()
.setStructuredNameAndMetadata(
new CounterStructuredNameAndMetadata()
.setName(
new CounterStructuredName()
.setOrigin(Origin.USER.toString())
.setOriginNamespace("ns")
.setName("name1")
.setOriginalStepName("s1"))
.setMetadata(new CounterMetadata().setKind(Kind.TRIE.toString())))
.setCumulative(false)
.set(Kind.TRIE.toString(), expectedName1.toProto());

Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));

boundedTrie = c2.getBoundedTrie(name2);
boundedTrie.add("ij");
boundedTrie.add("kl", "mn");
boundedTrie.add("mn");

BoundedTrieData expectedName2 = new BoundedTrieData();
expectedName2.add(ImmutableList.of("ij"));
expectedName2.add(ImmutableList.of("kl", "mn"));
expectedName2.add(ImmutableList.of("mn"));

CounterUpdate name2Update =
new CounterUpdate()
.setStructuredNameAndMetadata(
new CounterStructuredNameAndMetadata()
.setName(
new CounterStructuredName()
.setOrigin(Origin.USER.toString())
.setOriginNamespace("ns")
.setName("name2")
.setOriginalStepName("s2"))
.setMetadata(new CounterMetadata().setKind(Kind.TRIE.toString())))
.setCumulative(false)
.set(Kind.TRIE.toString(), expectedName2.toProto());

updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));

c1.getBoundedTrie(name1).add("op");
expectedName1.add(ImmutableList.of("op"));
name1Update.set(Kind.TRIE.toString(), expectedName1.toProto());

updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));
}

@Test
public void testPerWorkerMetrics() {
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false);
Expand Down

0 comments on commit 4b0b070

Please sign in to comment.