Skip to content

Commit

Permalink
Plumb BoundedTrie to MetricsContainerStepMap and make combine return …
Browse files Browse the repository at this point in the history
…reference
  • Loading branch information
rohitsinha54 committed Dec 23, 2024
1 parent 724d310 commit c0bb3f1
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@
public class BoundedTrieCell implements BoundedTrie, MetricCell<BoundedTrieData> {

private final DirtyState dirty = new DirtyState();
private final BoundedTrieData value;
private BoundedTrieData value;
private final MetricName name;

public BoundedTrieCell(MetricName name) {
this.name = name;
this.value = new BoundedTrieData();
}

public void update(BoundedTrieData other) {
this.value.combine(other);
public synchronized void update(BoundedTrieData other) {
this.value = this.value.combine(other);
dirty.afterModification();
}

@Override
public void reset() {
public synchronized void reset() {
value.clear();
dirty.reset();
}
Expand All @@ -60,7 +60,7 @@ public DirtyState getDirty() {
}

@Override
public BoundedTrieData getCumulative() {
public synchronized BoundedTrieData getCumulative() {
return value.getCumulative();
}

Expand All @@ -86,13 +86,13 @@ public int hashCode() {
}

@Override
public void add(Iterable<String> values) {
public synchronized void add(Iterable<String> values) {
this.value.add(values);
dirty.afterModification();
}

@Override
public void add(String... values) {
public synchronized void add(String... values) {
add(Arrays.asList(values));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public synchronized BoundedTrieData getCumulative() {
*
* @return The set of paths.
*/
public synchronized Set<List<String>> getResult() {
public synchronized Set<List<String>> extractResult() {
if (this.root == null) {
if (this.singleton == null) {
return ImmutableSet.of();
Expand Down Expand Up @@ -195,10 +195,11 @@ public synchronized void add(Iterable<String> segments) {
* copy.
*
* @param other The other {@link BoundedTrieData} to combine with.
* @return The combined {@link BoundedTrieData}.
*/
public synchronized void combine(@Nonnull BoundedTrieData other) {
public synchronized BoundedTrieData combine(@Nonnull BoundedTrieData other) {
if (other.root == null && other.singleton == null) {
return;
return this;
}
// other can be modified in some different thread, and we need to atomically access
// its fields to combine correctly. Furthermore, simply doing this under synchronized(other)
Expand All @@ -207,19 +208,17 @@ public synchronized void combine(@Nonnull BoundedTrieData other) {
// while some other thread is performing `other.combiner(this)` and waiting to get a
// lock on `this` object.
BoundedTrieData otherDeepCopy = other.getCumulative();
if (this.root != null || this.singleton != null) {
// after this we are guaranteed to have non-null otherDeepCopy.root
otherDeepCopy.root = otherDeepCopy.asTrie();
otherDeepCopy.singleton = null;
otherDeepCopy.root.merge(this.asTrie());
otherDeepCopy.bound = Math.min(this.bound, otherDeepCopy.bound);
while (otherDeepCopy.root.getSize() > otherDeepCopy.bound) {
otherDeepCopy.root.trim();
}
if (this.root == null && this.singleton == null) {
return otherDeepCopy;
}
otherDeepCopy.root = otherDeepCopy.asTrie();
otherDeepCopy.singleton = null;
otherDeepCopy.root.merge(this.asTrie());
otherDeepCopy.bound = Math.min(this.bound, otherDeepCopy.bound);
while (otherDeepCopy.root.getSize() > otherDeepCopy.bound) {
otherDeepCopy.root.trim();
}
this.root = otherDeepCopy.root;
this.singleton = otherDeepCopy.singleton;
this.bound = otherDeepCopy.bound;
return otherDeepCopy;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.core.metrics;

import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricFiltering;
Expand All @@ -42,16 +44,19 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<Set<List<String>>>> boundedTries;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<Set<List<String>>>> boundedTries) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
this.boundedTries = boundedTries;
}

@Override
Expand All @@ -62,6 +67,8 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())));
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
boundedTries, boundedTries -> MetricFiltering.matches(filter, boundedTries.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public static MetricResults asMetricResults(
Map<MetricKey, MetricResult<DistributionData>> distributions = new HashMap<>();
Map<MetricKey, MetricResult<GaugeData>> gauges = new HashMap<>();
Map<MetricKey, MetricResult<StringSetData>> sets = new HashMap<>();
Map<MetricKey, MetricResult<BoundedTrieData>> boundedTries = new HashMap<>();

attemptedMetricsContainers.forEachMetricContainer(
container -> {
Expand All @@ -146,6 +147,8 @@ public static MetricResults asMetricResults(
distributions, cumulative.distributionUpdates(), DistributionData::combine);
mergeAttemptedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine);
mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
mergeAttemptedResults(
boundedTries, cumulative.boundedTrieUpdates(), BoundedTrieData::combine);
});
committedMetricsContainers.forEachMetricContainer(
container -> {
Expand All @@ -155,6 +158,8 @@ public static MetricResults asMetricResults(
distributions, cumulative.distributionUpdates(), DistributionData::combine);
mergeCommittedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine);
mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
mergeCommittedResults(
boundedTries, cumulative.boundedTrieUpdates(), BoundedTrieData::combine);
});

return new DefaultMetricResults(
Expand All @@ -167,6 +172,9 @@ public static MetricResults asMetricResults(
.collect(toList()),
sets.values().stream()
.map(result -> result.transform(StringSetData::extractResult))
.collect(toList()),
boundedTries.values().stream()
.map(result -> result.transform(BoundedTrieData::extractResult))
.collect(toList()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testDeltaAndCumulative() {
Arrays.asList("a", "a", String.valueOf(false)),
Arrays.asList("b", "d", String.valueOf(false)),
Arrays.asList("b", "c", String.valueOf(false))),
cumulative.getResult());
cumulative.extractResult());

assertThat(cell.getDirty().beforeCommit(), equalTo(true));
cell.getDirty().afterCommit();
Expand All @@ -57,7 +57,7 @@ public void testDeltaAndCumulative() {
cell.add("b", "a");
BoundedTrieData newCumulative = cell.getCumulative();
assertEquals(
newCumulative.getResult(),
newCumulative.extractResult(),
ImmutableSet.of(
Arrays.asList("a", "a", String.valueOf(false)),
Arrays.asList("b", "d", String.valueOf(false)),
Expand All @@ -66,7 +66,7 @@ public void testDeltaAndCumulative() {

// but not previously obtained cumulative value
assertEquals(
cumulative.getResult(),
cumulative.extractResult(),
ImmutableSet.of(
Arrays.asList("a", "a", String.valueOf(false)),
Arrays.asList("b", "d", String.valueOf(false)),
Expand Down
Loading

0 comments on commit c0bb3f1

Please sign in to comment.