Skip to content

Commit

Permalink
Plumb BoundedTrie to MetricQueryResults
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitsinha54 committed Dec 23, 2024
1 parent 3c5d613 commit 724d310
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.auto.value.AutoValue;
import java.util.List;
import java.util.Set;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/** The results of a query for metrics. Allows accessing all the metrics that matched the filter. */
Expand All @@ -36,6 +37,9 @@ public abstract class MetricQueryResults {
/** Return the metric results for the sets that matched the filter. */
public abstract Iterable<MetricResult<StringSetResult>> getStringSets();

/** Return the metric results for the bounded tries that matched the filter. */
public abstract Iterable<MetricResult<Set<List<String>>>> getBoundedTries();

static <T> void printMetrics(String type, Iterable<MetricResult<T>> metrics, StringBuilder sb) {
List<MetricResult<T>> metricsList = ImmutableList.copyOf(metrics);
if (!metricsList.isEmpty()) {
Expand Down Expand Up @@ -65,6 +69,7 @@ public final String toString() {
printMetrics("Distributions", getDistributions(), sb);
printMetrics("Gauges", getGauges(), sb);
printMetrics("StringSets", getStringSets(), sb);
printMetrics("BoundedTries", getBoundedTries(), sb);
sb.append(")");
return sb.toString();
}
Expand All @@ -73,7 +78,9 @@ public static MetricQueryResults create(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
return new AutoValue_MetricQueryResults(counters, distributions, gauges, stringSets);
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<Set<List<String>>>> boundedTries) {
return new AutoValue_MetricQueryResults(
counters, distributions, gauges, stringSets, boundedTries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ public static StringSet stringSet(String namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
}

/** Create a metric that accumulates and reports set of unique string values. */
public static BoundedTrie boundedTrie(Class<?> namespace, String name) {
return new DelegatingBoundedTrie(MetricName.named(namespace, name));
}

/** Create a metric that accumulates and reports set of unique string values. */
public static BoundedTrie boundedTrie(String namespace, String name) {
return new DelegatingBoundedTrie(MetricName.named(namespace, name));
}

/** Create a metric that accumulates and reports set of unique string values. */
public static StringSet stringSet(Class<?> namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
Expand Down Expand Up @@ -253,4 +263,28 @@ public MetricName getName() {
return name;
}
}

/**
* Implementation of {@link BoundedTrie} that delegates to the instance for the current context.
*/
private static class DelegatingBoundedTrie implements Metric, BoundedTrie, Serializable {
private final MetricName name;

private DelegatingBoundedTrie(MetricName name) {
this.name = name;
}

@Override
public MetricName getName() {
return name;
}

@Override
public void add(Iterable<String> values) {
MetricsContainer container = MetricsEnvironment.getCurrentContainer();
if (container != null) {
container.getBoundedTrie(name).add(values);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;

import org.apache.beam.sdk.annotations.Internal;

/**
* Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.BoundedTrie}.
* Tests tagged with {@link UsesBoundedTrieMetrics} should be run for runners which support
* BoundedTrie.
*/
@Internal
public class UsesBoundedTrieMetrics {}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
import org.apache.beam.sdk.testing.UsesBoundedTrieMetrics;
import org.apache.beam.sdk.testing.UsesCommittedMetrics;
import org.apache.beam.sdk.testing.UsesCounterMetrics;
import org.apache.beam.sdk.testing.UsesDistributionMetrics;
Expand Down Expand Up @@ -122,6 +123,8 @@ public void startBundle() {
public void processElement(ProcessContext c) {
Distribution values = Metrics.distribution(MetricsTest.class, "input");
StringSet sources = Metrics.stringSet(MetricsTest.class, "sources");
BoundedTrie boundedTrieSources =
Metrics.boundedTrie(MetricsTest.class, "boundedTrieSources");
count.inc();
values.update(c.element());

Expand All @@ -131,6 +134,8 @@ public void processElement(ProcessContext c) {
sources.add("gcs"); // repeated should appear once
sources.add("gcs", "gcs"); // repeated should appear once
sideinputs.add("bigtable", "spanner");
boundedTrieSources.add(ImmutableList.of("ab_source", "cd_source"));
boundedTrieSources.add(ImmutableList.of("ef_source"));
}

@DoFn.FinishBundle
Expand All @@ -148,13 +153,17 @@ public void processElement(ProcessContext c) {
Distribution values = Metrics.distribution(MetricsTest.class, "input");
Gauge gauge = Metrics.gauge(MetricsTest.class, "my-gauge");
StringSet sinks = Metrics.stringSet(MetricsTest.class, "sinks");
BoundedTrie boundedTrieSinks =
Metrics.boundedTrie(MetricsTest.class, "boundedTrieSinks");
Integer element = c.element();
count.inc();
values.update(element);
gauge.set(12L);
c.output(element);
sinks.add("bq", "kafka", "kafka"); // repeated should appear once
sideinputs.add("bigtable", "sql");
boundedTrieSinks.add(ImmutableList.of("ab_sink", "cd_sink"));
boundedTrieSinks.add(ImmutableList.of("ef_sink"));
c.output(output2, element);
}
})
Expand Down Expand Up @@ -277,7 +286,8 @@ public static class CommittedMetricTests extends SharedTestBase {
UsesCounterMetrics.class,
UsesDistributionMetrics.class,
UsesGaugeMetrics.class,
UsesStringSetMetrics.class
UsesStringSetMetrics.class,
UsesBoundedTrieMetrics.class
})
@Test
public void testAllCommittedMetrics() {
Expand Down Expand Up @@ -319,6 +329,14 @@ public void testCommittedStringSetMetrics() {
assertStringSetMetrics(metrics, true);
}

@Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesBoundedTrieMetrics.class})
@Test
public void testCommittedBoundedTrieMetrics() {
PipelineResult result = runPipelineWithMetrics();
MetricQueryResults metrics = queryTestMetrics(result);
assertBoundedTrieMetrics(metrics, true);
}

@Test
@Category({NeedsRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
public void testBoundedSourceMetrics() {
Expand Down Expand Up @@ -405,7 +423,8 @@ public static class AttemptedMetricTests extends SharedTestBase {
UsesCounterMetrics.class,
UsesDistributionMetrics.class,
UsesGaugeMetrics.class,
UsesStringSetMetrics.class
UsesStringSetMetrics.class,
UsesBoundedTrieMetrics.class
})
@Test
public void testAllAttemptedMetrics() {
Expand Down Expand Up @@ -448,6 +467,14 @@ public void testAttemptedStringSetMetrics() {
assertStringSetMetrics(metrics, false);
}

@Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesBoundedTrieMetrics.class})
@Test
public void testAttemptedBoundedTrieMetrics() {
PipelineResult result = runPipelineWithMetrics();
MetricQueryResults metrics = queryTestMetrics(result);
assertBoundedTrieMetrics(metrics, false);
}

@Test
@Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
public void testBoundedSourceMetricsInSplit() {
Expand Down Expand Up @@ -634,6 +661,32 @@ private static void assertStringSetMetrics(MetricQueryResults metrics, boolean i
isCommitted)));
}

private static void assertBoundedTrieMetrics(MetricQueryResults metrics, boolean isCommitted) {
// TODO(https://github.com/apache/beam/issues/32001) use containsInAnyOrder once portableMetrics
// duplicate metrics issue fixed
assertThat(
metrics.getBoundedTries(),
hasItem(
metricsResultPatchStep(
"boundedTrieSources",
"MyStep1",
ImmutableSet.of(
ImmutableList.of("ab_source", "cd_source", String.valueOf(false)),
ImmutableList.of("ef_source", String.valueOf(false))),
isCommitted)));
assertThat(
metrics.getBoundedTries(),
hasItem(
metricsResult(
NAMESPACE,
"boundedTrieSinks",
"MyStep2",
ImmutableSet.of(
ImmutableList.of("ab_sink", "cd_sink", String.valueOf(false)),
ImmutableList.of("ef_sink", String.valueOf(false))),
isCommitted)));
}

private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) {
assertThat(
metrics.getDistributions(),
Expand Down Expand Up @@ -665,5 +718,6 @@ private static void assertAllMetrics(MetricQueryResults metrics, boolean isCommi
assertDistributionMetrics(metrics, isCommitted);
assertGaugeMetrics(metrics, isCommitted);
assertStringSetMetrics(metrics, isCommitted);
assertBoundedTrieMetrics(metrics, isCommitted);
}
}

0 comments on commit 724d310

Please sign in to comment.