From a3eacfe5cdc7d29d70331e924e9fc52cebef752f Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 4 May 2017 14:56:14 -0700 Subject: [PATCH 1/2] Add querying for distribution metrics in Java --- .../org/apache/beam/examples/WordCount.java | 4 + pom.xml | 2 +- .../runners/dataflow/DataflowMetrics.java | 265 ++++++++++++------ .../runners/dataflow/DataflowMetricsTest.java | 58 +++- 4 files changed, 236 insertions(+), 93 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index bfa7eb320b5f..2d568ce0c2e0 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -88,9 +89,12 @@ public class WordCount { */ static class ExtractWordsFn extends DoFn { private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); + private final Distribution lineLenDist = Metrics.distribution( + ExtractWordsFn.class, "lineLenDistro"); @ProcessElement public void processElement(ProcessContext c) { + lineLenDist.update(c.element().length()); if (c.element().trim().isEmpty()) { emptyLines.inc(); } diff --git a/pom.xml b/pom.xml index 805a8d64e9c8..beca15352374 100644 --- a/pom.xml +++ b/pom.xml @@ -111,7 +111,7 @@ v1-rev6-1.22.0 0.1.0 v2-rev8-1.22.0 - v1b3-rev196-1.22.0 + v1b3-rev198-1.20.0 0.5.160222 1.4.0 1.3.0 diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index f038e3f8fc6e..01cc0325fb10 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -19,6 +19,7 @@ import static com.google.common.base.MoreObjects.firstNonNull; +import com.google.api.client.util.ArrayMap; import com.google.api.services.dataflow.model.JobMetrics; import com.google.auto.value.AutoValue; import com.google.common.base.Objects; @@ -72,34 +73,6 @@ public DataflowMetrics(DataflowPipelineJob dataflowPipelineJob, DataflowClient d this.dataflowPipelineJob = dataflowPipelineJob; } - /** - * Build an immutable map that serves as a hash key for a metric update. - * @return a {@link MetricKey} that can be hashed and used to identify a metric. - */ - private MetricKey metricHashKey( - com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { - String fullStepName = metricUpdate.getName().getContext().get("step"); - fullStepName = (dataflowPipelineJob.transformStepNames != null - ? dataflowPipelineJob.transformStepNames - .inverse().get(fullStepName).getFullName() : fullStepName); - return MetricKey.create( - fullStepName, - MetricName.named( - metricUpdate.getName().getContext().get("namespace"), - metricUpdate.getName().getName())); - } - - /** - * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative - * update or not. - * @return true if update is tentative, false otherwise - */ - private boolean isMetricTentative( - com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { - return (metricUpdate.getName().getContext().containsKey("tentative") - && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true")); - } - /** * Take a list of metric updates coming from the Dataflow service, and format it into a * Metrics API MetricQueryResults instance. @@ -109,65 +82,8 @@ private boolean isMetricTentative( private MetricQueryResults populateMetricQueryResults( List metricUpdates, MetricsFilter filter) { - // Separate metric updates by name and by tentative/committed. - HashMap - tentativeByName = new HashMap<>(); - HashMap - committedByName = new HashMap<>(); - HashSet metricHashKeys = new HashSet<>(); - - // If the Context of the metric update does not have a namespace, then these are not - // actual metrics counters. - for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) { - if (Objects.equal(update.getName().getOrigin(), "user") && isMetricTentative(update) - && update.getName().getContext().containsKey("namespace")) { - tentativeByName.put(metricHashKey(update), update); - metricHashKeys.add(metricHashKey(update)); - } else if (Objects.equal(update.getName().getOrigin(), "user") - && update.getName().getContext().containsKey("namespace") - && !isMetricTentative(update)) { - committedByName.put(metricHashKey(update), update); - metricHashKeys.add(metricHashKey(update)); - } - } - // Create the lists with the metric result information. - ImmutableList.Builder> counterResults = ImmutableList.builder(); - ImmutableList.Builder> distributionResults = - ImmutableList.builder(); - ImmutableList.Builder> gaugeResults = ImmutableList.builder(); - for (MetricKey metricKey : metricHashKeys) { - if (!MetricFiltering.matches(filter, metricKey)) { - // Skip unmatched metrics early. - continue; - } - - // This code is not robust to evolutions in the types of metrics that can be returned, so - // wrap it in a try-catch and log errors. - try { - String metricName = metricKey.metricName().name(); - if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]") - || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) { - // Skip distribution metrics, as these are not yet properly supported. - LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow" - + " User Interface"); - continue; - } - - String namespace = metricKey.metricName().namespace(); - String step = metricKey.stepName(); - Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue(); - Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue(); - counterResults.add( - DataflowMetricResult.create( - MetricName.named(namespace, metricName), step, committed, attempted)); - } catch (Exception e) { - LOG.warn("Error handling metric {} for filter {}, skipping result.", metricKey, filter); - } - } - return DataflowMetricQueryResults.create( - counterResults.build(), - distributionResults.build(), - gaugeResults.build()); + return DataflowMetricQueryResultsFactory.create(dataflowPipelineJob, metricUpdates, filter) + .build(); } private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) { @@ -206,6 +122,181 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) { return result; } + private static class DataflowMetricResultExtractor { + private final ImmutableList.Builder> counterResults; + private final ImmutableList.Builder> distributionResults; + private final ImmutableList.Builder> gaugeResults; + + DataflowMetricResultExtractor() { + counterResults = ImmutableList.builder(); + distributionResults = ImmutableList.builder(); + gaugeResults = ImmutableList.builder(); + } + + public void addMetricResult( + MetricKey metricKey, + com.google.api.services.dataflow.model.MetricUpdate committed, + com.google.api.services.dataflow.model.MetricUpdate attempted) { + if (committed.getDistribution() != null && attempted.getDistribution() != null) { + // distribution metric + distributionResults.add( + DataflowMetricResult.create( + metricKey.metricName(), + metricKey.stepName(), + getDistributionValue(committed), + getDistributionValue(attempted))); + } else if (committed.getScalar() != null && attempted.getScalar() != null) { + // counter metric + counterResults.add( + DataflowMetricResult.create( + metricKey.metricName(), + metricKey.stepName(), + getCounterValue(committed), + getCounterValue(attempted))); + } else { + // This is exceptionally unexpected. We expect matching user metrics to only have the + // value types provided by the Metrics API. + LOG.warn("Unexpected metric type. Please report JOB ID to Dataflow Support. Metric key: " + + metricKey.toString()); + } + } + + private Long getCounterValue(com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + if (metricUpdate.getScalar() == null) { + return 0L; + } + return ((Number) metricUpdate.getScalar()).longValue(); + } + + private DistributionResult getDistributionValue( + com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + if (metricUpdate.getDistribution() == null) { + return DistributionResult.ZERO; + } + ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution(); + Long count = ((Number) distributionMap.get("count")).longValue(); + Long min = ((Number) distributionMap.get("min")).longValue(); + Long max = ((Number) distributionMap.get("max")).longValue(); + Long mean = ((Number) distributionMap.get("mean")).longValue(); + // TODO: Switch to use sum when it's available in the service. + return DistributionResult.create(count * mean, count, min, max); + } + + public Iterable> getDistributionResults() { + return distributionResults.build(); + } + + public Iterable> getCounterResults() { + return counterResults.build(); + } + + public Iterable> getGaugeResults() { + return gaugeResults.build(); + } + } + + private static class DataflowMetricQueryResultsFactory { + Iterable metricUpdates; + MetricsFilter filter; + HashMap tentativeByName; + HashMap committedByName; + HashSet metricHashKeys; + private DataflowPipelineJob dataflowPipelineJob; + + public static DataflowMetricQueryResultsFactory create(DataflowPipelineJob dataflowPipelineJob, + Iterable metricUpdates, + MetricsFilter filter) { + return new DataflowMetricQueryResultsFactory(dataflowPipelineJob, metricUpdates, filter); + } + + private DataflowMetricQueryResultsFactory(DataflowPipelineJob dataflowPipelineJob, + Iterable metricUpdates, + MetricsFilter filter) { + this.dataflowPipelineJob = dataflowPipelineJob; + this.metricUpdates = metricUpdates; + this.filter = filter; + + tentativeByName = new HashMap<>(); + committedByName = new HashMap<>(); + metricHashKeys = new HashSet<>(); + } + + /** + * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative + * update or not. + * @return true if update is tentative, false otherwise + */ + private boolean isMetricTentative( + com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + return (metricUpdate.getName().getContext().containsKey("tentative") + && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true")); + } + + /** + * Build an immutable map that serves as a hash key for a metric update. + * @return a {@link MetricKey} that can be hashed and used to identify a metric. + */ + private MetricKey getMetricHashKey( + com.google.api.services.dataflow.model.MetricUpdate metricUpdate) { + String fullStepName = metricUpdate.getName().getContext().get("step"); + fullStepName = (dataflowPipelineJob.transformStepNames != null + ? dataflowPipelineJob.transformStepNames + .inverse().get(fullStepName).getFullName() : fullStepName); + return MetricKey.create( + fullStepName, + MetricName.named( + metricUpdate.getName().getContext().get("namespace"), + metricUpdate.getName().getName())); + } + + private void buildMetricsIndex() { + // If the Context of the metric update does not have a namespace, then these are not + // actual metrics counters. + for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) { + MetricKey updateKey = getMetricHashKey(update); + if (!MetricFiltering.matches(filter, updateKey)) { + // Skip unmatched metrics early. + continue; + } + if (Objects.equal(update.getName().getOrigin(), "user") + && update.getName().getContext().containsKey("namespace")) { + if (isMetricTentative(update)) { + assert !tentativeByName.containsKey(updateKey); + tentativeByName.put(updateKey, update); + metricHashKeys.add(updateKey); + } else { + assert !committedByName.containsKey(updateKey); + committedByName.put(updateKey, update); + metricHashKeys.add(updateKey); + } + } + } + } + + public MetricQueryResults build() { + buildMetricsIndex(); + + DataflowMetricResultExtractor extractor = new DataflowMetricResultExtractor(); + for (MetricKey metricKey : metricHashKeys) { + String metricName = metricKey.metricName().name(); + if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]") + || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) { + // Skip distribution metrics, as these are not yet properly supported. + // TODO: remove this when distributions stop being broken up for the UI. + continue; + } + + extractor.addMetricResult(metricKey, + committedByName.get(metricKey), + tentativeByName.get(metricKey)); + } + return DataflowMetricQueryResults.create( + extractor.getCounterResults(), + extractor.getDistributionResults(), + extractor.getGaugeResults()); + } + } + @AutoValue abstract static class DataflowMetricQueryResults implements MetricQueryResults { public static MetricQueryResults create( diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 85a0979b3c69..32e5ff9d3b0a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; @@ -28,6 +29,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.client.util.ArrayMap; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.JobMetrics; @@ -40,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; +import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Before; @@ -129,11 +132,8 @@ public void testCachingMetricUpdates() throws IOException { verify(dataflowClient, times(1)).getJobMetrics(JOB_ID); } - private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step, - long scalar, boolean tentative) { - MetricUpdate update = new MetricUpdate(); - update.setScalar(new BigDecimal(scalar)); - + private MetricUpdate setStructuredName(MetricUpdate update, String name, String namespace, + String step, boolean tentative) { MetricStructuredName structuredName = new MetricStructuredName(); structuredName.setName(name); structuredName.setOrigin("user"); @@ -148,6 +148,27 @@ private MetricUpdate makeCounterMetricUpdate(String name, String namespace, Stri return update; } + private MetricUpdate makeDistributionMetricUpdate(String name, String namespace, String step, + Long sum, Long count, Long min, Long max, boolean tentative) { + MetricUpdate update = new MetricUpdate(); + ArrayMap distribution = ArrayMap.create(); + distribution.add("count", new BigDecimal(count)); + distribution.add("mean", new BigDecimal(sum / count)); + distribution.add("sum", new BigDecimal(sum)); + distribution.add("min", new BigDecimal(min)); + distribution.add("max", new BigDecimal(max)); + update.setDistribution(distribution); + return setStructuredName(update, name, namespace, step, tentative); + } + + private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step, + long scalar, boolean tentative) { + MetricUpdate update = new MetricUpdate(); + update.setScalar(new BigDecimal(scalar)); + return setStructuredName(update, name, namespace, step, tentative); + + } + @Test public void testSingleCounterUpdates() throws IOException { JobMetrics jobMetrics = new JobMetrics(); @@ -202,6 +223,33 @@ public void testIgnoreDistributionButGetCounterUpdates() throws IOException { committedMetricsResult("counterNamespace", "counterName", "s2", 1233L))); } + @Test + public void testDistributionUpdates() throws IOException { + JobMetrics jobMetrics = new JobMetrics(); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + when(job.getState()).thenReturn(State.RUNNING); + job.jobId = JOB_ID; + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + jobMetrics.setMetrics(ImmutableList.of( + makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2", + 18L, 2L, 2L, 16L, false), + makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2", + 18L, 2L, 2L, 16L, true))); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.queryMetrics(null); + assertThat(result.distributions(), contains( + attemptedMetricsResult("distributionNamespace", "distributionName", "s2", + DistributionResult.create(18, 2, 2, 16)))); + assertThat(result.distributions(), contains( + committedMetricsResult("distributionNamespace", "distributionName", "s2", + DistributionResult.create(18, 2, 2, 16)))); + } + @Test public void testMultipleCounterUpdates() throws IOException { JobMetrics jobMetrics = new JobMetrics(); From a322b5e1a55b078e520536ab5c0ae1ba053fdeb3 Mon Sep 17 00:00:00 2001 From: bchambers Date: Tue, 6 Jun 2017 15:43:10 -0700 Subject: [PATCH 2/2] fixup! Remove assert (does nothing) and just log when encountering unexpected cases --- .../runners/dataflow/DataflowMetrics.java | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 01cc0325fb10..9a453f59ce08 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -21,6 +21,7 @@ import com.google.api.client.util.ArrayMap; import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricUpdate; import com.google.auto.value.AutoValue; import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; @@ -29,6 +30,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.runners.core.metrics.MetricFiltering; import org.apache.beam.runners.core.metrics.MetricKey; import org.apache.beam.sdk.metrics.DistributionResult; @@ -135,9 +137,13 @@ private static class DataflowMetricResultExtractor { public void addMetricResult( MetricKey metricKey, - com.google.api.services.dataflow.model.MetricUpdate committed, - com.google.api.services.dataflow.model.MetricUpdate attempted) { - if (committed.getDistribution() != null && attempted.getDistribution() != null) { + @Nullable com.google.api.services.dataflow.model.MetricUpdate committed, + @Nullable com.google.api.services.dataflow.model.MetricUpdate attempted) { + if (committed == null || attempted == null) { + LOG.warn( + "Unexpected metric {} did not have both a committed ({}) and tentative value ({}).", + metricKey, committed, attempted); + } else if (committed.getDistribution() != null && attempted.getDistribution() != null) { // distribution metric distributionResults.add( DataflowMetricResult.create( @@ -196,12 +202,14 @@ public Iterable> getGaugeResults() { } private static class DataflowMetricQueryResultsFactory { - Iterable metricUpdates; - MetricsFilter filter; - HashMap tentativeByName; - HashMap committedByName; - HashSet metricHashKeys; - private DataflowPipelineJob dataflowPipelineJob; + private final Iterable metricUpdates; + private final MetricsFilter filter; + private final HashMap + tentativeByName; + private final HashMap + committedByName; + private final HashSet metricHashKeys; + private final DataflowPipelineJob dataflowPipelineJob; public static DataflowMetricQueryResultsFactory create(DataflowPipelineJob dataflowPipelineJob, Iterable metricUpdates, @@ -258,16 +266,24 @@ private void buildMetricsIndex() { // Skip unmatched metrics early. continue; } - if (Objects.equal(update.getName().getOrigin(), "user") + + if (update.getName().getOrigin() != null + && update.getName().getOrigin().toLowerCase().equals("user") && update.getName().getContext().containsKey("namespace")) { - if (isMetricTentative(update)) { - assert !tentativeByName.containsKey(updateKey); - tentativeByName.put(updateKey, update); - metricHashKeys.add(updateKey); - } else { - assert !committedByName.containsKey(updateKey); - committedByName.put(updateKey, update); - metricHashKeys.add(updateKey); + // Skip non-user metrics, which should have both a "user" origin and a namespace. + continue; + } + + metricHashKeys.add(updateKey); + if (isMetricTentative(update)) { + MetricUpdate previousUpdate = tentativeByName.put(updateKey, update); + if (previousUpdate != null) { + LOG.warn("Metric {} alreday had a tentative value of {}", updateKey, previousUpdate); + } + } else { + MetricUpdate previousUpdate = committedByName.put(updateKey, update); + if (previousUpdate != null) { + LOG.warn("Metric {} alreday had a committed value of {}", updateKey, previousUpdate); } } }