Skip to content

Commit

Permalink
This closes apache#2.
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloem committed Jun 7, 2017
2 parents de56f96 + 23bea2a commit 9c5c0c0
Showing 1 changed file with 34 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.api.client.util.ArrayMap;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.auto.value.AutoValue;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
Expand All @@ -29,6 +30,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricFiltering;
import org.apache.beam.runners.core.metrics.MetricKey;
import org.apache.beam.sdk.metrics.DistributionResult;
Expand Down Expand Up @@ -135,9 +137,13 @@ private static class DataflowMetricResultExtractor {

public void addMetricResult(
MetricKey metricKey,
com.google.api.services.dataflow.model.MetricUpdate committed,
com.google.api.services.dataflow.model.MetricUpdate attempted) {
if (committed.getDistribution() != null && attempted.getDistribution() != null) {
@Nullable com.google.api.services.dataflow.model.MetricUpdate committed,
@Nullable com.google.api.services.dataflow.model.MetricUpdate attempted) {
if (committed == null || attempted == null) {
LOG.warn(
"Unexpected metric {} did not have both a committed ({}) and tentative value ({}).",
metricKey, committed, attempted);
} else if (committed.getDistribution() != null && attempted.getDistribution() != null) {
// distribution metric
distributionResults.add(
DataflowMetricResult.create(
Expand Down Expand Up @@ -196,12 +202,14 @@ public Iterable<MetricResult<GaugeResult>> getGaugeResults() {
}

private static class DataflowMetricQueryResultsFactory {
Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
MetricsFilter filter;
HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate> tentativeByName;
HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate> committedByName;
HashSet<MetricKey> metricHashKeys;
private DataflowPipelineJob dataflowPipelineJob;
private final Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
private final MetricsFilter filter;
private final HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
tentativeByName;
private final HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
committedByName;
private final HashSet<MetricKey> metricHashKeys;
private final DataflowPipelineJob dataflowPipelineJob;

public static DataflowMetricQueryResultsFactory create(DataflowPipelineJob dataflowPipelineJob,
Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
Expand Down Expand Up @@ -258,16 +266,24 @@ private void buildMetricsIndex() {
// Skip unmatched metrics early.
continue;
}
if (Objects.equal(update.getName().getOrigin(), "user")

if (update.getName().getOrigin() != null
&& update.getName().getOrigin().toLowerCase().equals("user")
&& update.getName().getContext().containsKey("namespace")) {
if (isMetricTentative(update)) {
assert !tentativeByName.containsKey(updateKey);
tentativeByName.put(updateKey, update);
metricHashKeys.add(updateKey);
} else {
assert !committedByName.containsKey(updateKey);
committedByName.put(updateKey, update);
metricHashKeys.add(updateKey);
// Skip non-user metrics, which should have both a "user" origin and a namespace.
continue;
}

metricHashKeys.add(updateKey);
if (isMetricTentative(update)) {
MetricUpdate previousUpdate = tentativeByName.put(updateKey, update);
if (previousUpdate != null) {
LOG.warn("Metric {} alreday had a tentative value of {}", updateKey, previousUpdate);
}
} else {
MetricUpdate previousUpdate = committedByName.put(updateKey, update);
if (previousUpdate != null) {
LOG.warn("Metric {} alreday had a committed value of {}", updateKey, previousUpdate);
}
}
}
Expand Down

0 comments on commit 9c5c0c0

Please sign in to comment.