Skip to content

Commit

Permalink
Address comments, perform deep copies and support synchronization for…
Browse files Browse the repository at this point in the history
… mutable BoundedTrieData
  • Loading branch information
rohitsinha54 committed Dec 19, 2024
1 parent 38ae8e9 commit c9f106b
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.metrics.BoundedTrie;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand All @@ -33,44 +31,37 @@
* In that case retrieving the underlying cell and reporting directly to it avoids a step of
* indirection.
*/
// TODO: Write multi-threaded test in MetricContainerImp for this Cell class too.
public class BoundedTrieCell implements BoundedTrie, MetricCell<BoundedTrieData> {

private final DirtyState dirty = new DirtyState();
private final AtomicReference<BoundedTrieData> setValue =
new AtomicReference<>(BoundedTrieData.empty());
private final BoundedTrieData value;
private final MetricName name;

/**
* Generally, runners should construct instances using the methods in {@link
* MetricsContainerImpl}, unless they need to define their own version of {@link
* MetricsContainer}. These constructors are *only* public so runners can instantiate.
*/
public BoundedTrieCell(MetricName name) {
this.name = name;
this.value = new BoundedTrieData();
}

public void update(BoundedTrieCell other) {
this.value.combine(other.value);
dirty.afterModification();
}

@Override
public void reset() {
setValue.set(BoundedTrieData.empty());
value.clear();
dirty.reset();
}

void update(BoundedTrieData data) {
BoundedTrieData original;
do {
original = setValue.get();
} while (!setValue.compareAndSet(original, original.combine(data)));
dirty.afterModification();
}

@Override
public DirtyState getDirty() {
return dirty;
}

@Override
public BoundedTrieData getCumulative() {
return setValue.get();
return value.getCumulative();
}

@Override
Expand All @@ -83,23 +74,20 @@ public boolean equals(@Nullable Object object) {
if (object instanceof BoundedTrieCell) {
BoundedTrieCell boundedTrieCell = (BoundedTrieCell) object;
return Objects.equals(dirty, boundedTrieCell.dirty)
&& Objects.equals(setValue.get(), boundedTrieCell.setValue.get())
&& Objects.equals(value, boundedTrieCell.value)
&& Objects.equals(name, boundedTrieCell.name);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(dirty, setValue.get(), name);
return Objects.hash(dirty, value, name);
}

@Override
public void add(Iterable<String> values) {
BoundedTrieData original;
do {
original = setValue.get();
} while (!setValue.compareAndSet(original, original.add(values)));
this.value.add(values);
dirty.afterModification();
}

Expand Down
Loading

0 comments on commit c9f106b

Please sign in to comment.