From 724d310eba9a585b35684912ed67de239f0c8ca3 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Sun, 22 Dec 2024 21:27:49 -0800 Subject: [PATCH] Plumb BoundedTrie to MetricQueryResults --- .../beam/sdk/metrics/MetricQueryResults.java | 11 +++- .../org/apache/beam/sdk/metrics/Metrics.java | 34 +++++++++++ .../sdk/testing/UsesBoundedTrieMetrics.java | 28 +++++++++ .../apache/beam/sdk/metrics/MetricsTest.java | 58 ++++++++++++++++++- 4 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesBoundedTrieMetrics.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java index 9f60ce3d6c07..83c76c802ecd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import java.util.List; +import java.util.Set; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** The results of a query for metrics. Allows accessing all the metrics that matched the filter. */ @@ -36,6 +37,9 @@ public abstract class MetricQueryResults { /** Return the metric results for the sets that matched the filter. */ public abstract Iterable> getStringSets(); + /** Return the metric results for the bounded tries that matched the filter. */ + public abstract Iterable>>> getBoundedTries(); + static void printMetrics(String type, Iterable> metrics, StringBuilder sb) { List> metricsList = ImmutableList.copyOf(metrics); if (!metricsList.isEmpty()) { @@ -65,6 +69,7 @@ public final String toString() { printMetrics("Distributions", getDistributions(), sb); printMetrics("Gauges", getGauges(), sb); printMetrics("StringSets", getStringSets(), sb); + printMetrics("BoundedTries", getBoundedTries(), sb); sb.append(")"); return sb.toString(); } @@ -73,7 +78,9 @@ public static MetricQueryResults create( Iterable> counters, Iterable> distributions, Iterable> gauges, - Iterable> stringSets) { - return new AutoValue_MetricQueryResults(counters, distributions, gauges, stringSets); + Iterable> stringSets, + Iterable>>> boundedTries) { + return new AutoValue_MetricQueryResults( + counters, distributions, gauges, stringSets, boundedTries); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index 6c8179006640..a975ae4a2944 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -155,6 +155,16 @@ public static StringSet stringSet(String namespace, String name) { return new DelegatingStringSet(MetricName.named(namespace, name)); } + /** Create a metric that accumulates and reports set of unique string values. */ + public static BoundedTrie boundedTrie(Class namespace, String name) { + return new DelegatingBoundedTrie(MetricName.named(namespace, name)); + } + + /** Create a metric that accumulates and reports set of unique string values. */ + public static BoundedTrie boundedTrie(String namespace, String name) { + return new DelegatingBoundedTrie(MetricName.named(namespace, name)); + } + /** Create a metric that accumulates and reports set of unique string values. */ public static StringSet stringSet(Class namespace, String name) { return new DelegatingStringSet(MetricName.named(namespace, name)); @@ -253,4 +263,28 @@ public MetricName getName() { return name; } } + + /** + * Implementation of {@link BoundedTrie} that delegates to the instance for the current context. + */ + private static class DelegatingBoundedTrie implements Metric, BoundedTrie, Serializable { + private final MetricName name; + + private DelegatingBoundedTrie(MetricName name) { + this.name = name; + } + + @Override + public MetricName getName() { + return name; + } + + @Override + public void add(Iterable values) { + MetricsContainer container = MetricsEnvironment.getCurrentContainer(); + if (container != null) { + container.getBoundedTrie(name).add(values); + } + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesBoundedTrieMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesBoundedTrieMetrics.java new file mode 100644 index 000000000000..fe593531e1d3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesBoundedTrieMetrics.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.BoundedTrie}. + * Tests tagged with {@link UsesBoundedTrieMetrics} should be run for runners which support + * BoundedTrie. + */ +@Internal +public class UsesBoundedTrieMetrics {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 662c4f52628a..dd8d3a968422 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesAttemptedMetrics; +import org.apache.beam.sdk.testing.UsesBoundedTrieMetrics; import org.apache.beam.sdk.testing.UsesCommittedMetrics; import org.apache.beam.sdk.testing.UsesCounterMetrics; import org.apache.beam.sdk.testing.UsesDistributionMetrics; @@ -122,6 +123,8 @@ public void startBundle() { public void processElement(ProcessContext c) { Distribution values = Metrics.distribution(MetricsTest.class, "input"); StringSet sources = Metrics.stringSet(MetricsTest.class, "sources"); + BoundedTrie boundedTrieSources = + Metrics.boundedTrie(MetricsTest.class, "boundedTrieSources"); count.inc(); values.update(c.element()); @@ -131,6 +134,8 @@ public void processElement(ProcessContext c) { sources.add("gcs"); // repeated should appear once sources.add("gcs", "gcs"); // repeated should appear once sideinputs.add("bigtable", "spanner"); + boundedTrieSources.add(ImmutableList.of("ab_source", "cd_source")); + boundedTrieSources.add(ImmutableList.of("ef_source")); } @DoFn.FinishBundle @@ -148,6 +153,8 @@ public void processElement(ProcessContext c) { Distribution values = Metrics.distribution(MetricsTest.class, "input"); Gauge gauge = Metrics.gauge(MetricsTest.class, "my-gauge"); StringSet sinks = Metrics.stringSet(MetricsTest.class, "sinks"); + BoundedTrie boundedTrieSinks = + Metrics.boundedTrie(MetricsTest.class, "boundedTrieSinks"); Integer element = c.element(); count.inc(); values.update(element); @@ -155,6 +162,8 @@ public void processElement(ProcessContext c) { c.output(element); sinks.add("bq", "kafka", "kafka"); // repeated should appear once sideinputs.add("bigtable", "sql"); + boundedTrieSinks.add(ImmutableList.of("ab_sink", "cd_sink")); + boundedTrieSinks.add(ImmutableList.of("ef_sink")); c.output(output2, element); } }) @@ -277,7 +286,8 @@ public static class CommittedMetricTests extends SharedTestBase { UsesCounterMetrics.class, UsesDistributionMetrics.class, UsesGaugeMetrics.class, - UsesStringSetMetrics.class + UsesStringSetMetrics.class, + UsesBoundedTrieMetrics.class }) @Test public void testAllCommittedMetrics() { @@ -319,6 +329,14 @@ public void testCommittedStringSetMetrics() { assertStringSetMetrics(metrics, true); } + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesBoundedTrieMetrics.class}) + @Test + public void testCommittedBoundedTrieMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); + assertBoundedTrieMetrics(metrics, true); + } + @Test @Category({NeedsRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testBoundedSourceMetrics() { @@ -405,7 +423,8 @@ public static class AttemptedMetricTests extends SharedTestBase { UsesCounterMetrics.class, UsesDistributionMetrics.class, UsesGaugeMetrics.class, - UsesStringSetMetrics.class + UsesStringSetMetrics.class, + UsesBoundedTrieMetrics.class }) @Test public void testAllAttemptedMetrics() { @@ -448,6 +467,14 @@ public void testAttemptedStringSetMetrics() { assertStringSetMetrics(metrics, false); } + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesBoundedTrieMetrics.class}) + @Test + public void testAttemptedBoundedTrieMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = queryTestMetrics(result); + assertBoundedTrieMetrics(metrics, false); + } + @Test @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testBoundedSourceMetricsInSplit() { @@ -634,6 +661,32 @@ private static void assertStringSetMetrics(MetricQueryResults metrics, boolean i isCommitted))); } + private static void assertBoundedTrieMetrics(MetricQueryResults metrics, boolean isCommitted) { + // TODO(https://github.com/apache/beam/issues/32001) use containsInAnyOrder once portableMetrics + // duplicate metrics issue fixed + assertThat( + metrics.getBoundedTries(), + hasItem( + metricsResultPatchStep( + "boundedTrieSources", + "MyStep1", + ImmutableSet.of( + ImmutableList.of("ab_source", "cd_source", String.valueOf(false)), + ImmutableList.of("ef_source", String.valueOf(false))), + isCommitted))); + assertThat( + metrics.getBoundedTries(), + hasItem( + metricsResult( + NAMESPACE, + "boundedTrieSinks", + "MyStep2", + ImmutableSet.of( + ImmutableList.of("ab_sink", "cd_sink", String.valueOf(false)), + ImmutableList.of("ef_sink", String.valueOf(false))), + isCommitted))); + } + private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) { assertThat( metrics.getDistributions(), @@ -665,5 +718,6 @@ private static void assertAllMetrics(MetricQueryResults metrics, boolean isCommi assertDistributionMetrics(metrics, isCommitted); assertGaugeMetrics(metrics, isCommitted); assertStringSetMetrics(metrics, isCommitted); + assertBoundedTrieMetrics(metrics, isCommitted); } }