Skip to content

Commit

Permalink
Enforce a size limit on StringSetData (apache#32650)
Browse files Browse the repository at this point in the history
* Enforce a size limit on StringSetData

* Make StringSetData set mutable. This avoids
  copy and create new ImutableSet every time

* adjust warning log
  • Loading branch information
Abacn committed Oct 8, 2024
1 parent 64854bb commit b4af134
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -101,11 +100,15 @@ public void add(String value) {
if (this.setValue.get().stringSet().contains(value)) {
return;
}
update(StringSetData.create(ImmutableSet.of(value)));
add(new String[] {value});
}

@Override
public void add(String... values) {
update(StringSetData.create(ImmutableSet.copyOf(values)));
StringSetData original;
do {
original = setValue.get();
} while (!setValue.compareAndSet(original, original.addAll(values)));
dirty.afterModification();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,73 @@

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Data describing the StringSet. The {@link StringSetData} hold an immutable copy of the set from
* which it was initially created. This should retain enough detail that it can be combined with
* other {@link StringSetData}.
* Data describing the StringSet. The {@link StringSetData} hold a copy of the set from which it was
* initially created. This should retain enough detail that it can be combined with other {@link
* StringSetData}.
*
* <p>The underlying set is mutable for {@link #addAll} operation, otherwise a copy set will be
* generated.
*
* <p>The summation of all string length for a {@code StringSetData} cannot exceed 1 MB. Further
* addition of elements are dropped.
*/
@AutoValue
public abstract class StringSetData implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(StringSetData.class);
// 1 MB
@VisibleForTesting static final long STRING_SET_SIZE_LIMIT = 1_000_000L;

public abstract Set<String> stringSet();

public abstract long stringSize();

/** Returns a {@link StringSetData} which is made from an immutable copy of the given set. */
public static StringSetData create(Set<String> set) {
return new AutoValue_StringSetData(ImmutableSet.copyOf(set));
if (set.isEmpty()) {
return empty();
}
HashSet<String> combined = new HashSet<>();
long stringSize = addUntilCapacity(combined, 0L, set);
return new AutoValue_StringSetData(combined, stringSize);
}

/** Returns a {@link StringSetData} which is made from the given set in place. */
private static StringSetData createInPlace(HashSet<String> set, long stringSize) {
return new AutoValue_StringSetData(set, stringSize);
}

/** Return a {@link EmptyStringSetData#INSTANCE} representing an empty {@link StringSetData}. */
public static StringSetData empty() {
return EmptyStringSetData.INSTANCE;
}

/**
* Add strings into this {@code StringSetData} and return the result {@code StringSetData}. Reuse
* the original StringSetData's set. As a result, current StringSetData will become invalid.
*
* <p>>Should only be used by {@link StringSetCell#add}.
*/
public StringSetData addAll(String... strings) {
HashSet<String> combined;
if (this.stringSet() instanceof HashSet) {
combined = (HashSet<String>) this.stringSet();
} else {
combined = new HashSet<>(this.stringSet());
}
long stringSize = addUntilCapacity(combined, this.stringSize(), Arrays.asList(strings));
return StringSetData.createInPlace(combined, stringSize);
}

/**
* Combines this {@link StringSetData} with other, both original StringSetData are left intact.
*/
Expand All @@ -54,30 +95,54 @@ public StringSetData combine(StringSetData other) {
} else if (other.stringSet().isEmpty()) {
return this;
} else {
ImmutableSet.Builder<String> combined = ImmutableSet.builder();
combined.addAll(this.stringSet());
combined.addAll(other.stringSet());
return StringSetData.create(combined.build());
HashSet<String> combined = new HashSet<>(this.stringSet());
long stringSize = addUntilCapacity(combined, this.stringSize(), other.stringSet());
return StringSetData.createInPlace(combined, stringSize);
}
}

/**
* Combines this {@link StringSetData} with others, all original StringSetData are left intact.
*/
public StringSetData combine(Iterable<StringSetData> others) {
Set<String> combined =
StreamSupport.stream(others.spliterator(), true)
.flatMap(other -> other.stringSet().stream())
.collect(Collectors.toSet());
combined.addAll(this.stringSet());
return StringSetData.create(combined);
HashSet<String> combined = new HashSet<>(this.stringSet());
long stringSize = this.stringSize();
for (StringSetData other : others) {
stringSize = addUntilCapacity(combined, stringSize, other.stringSet());
}
return StringSetData.createInPlace(combined, stringSize);
}

/** Returns a {@link StringSetResult} representing this {@link StringSetData}. */
public StringSetResult extractResult() {
return StringSetResult.create(stringSet());
}

/** Add strings into set until reach capacity. Return the all string size of added set. */
private static long addUntilCapacity(
HashSet<String> combined, long currentSize, Iterable<String> others) {
if (currentSize > STRING_SET_SIZE_LIMIT) {
// already at capacity
return currentSize;
}
for (String string : others) {
if (combined.add(string)) {
currentSize += string.length();

// check capacity both before insert and after insert one, so the warning only emit once.
if (currentSize > STRING_SET_SIZE_LIMIT) {
LOG.warn(
"StringSet metrics reaches capacity. Further incoming elements won't be recorded."
+ " Current size: {}, last element size: {}.",
currentSize,
string.length());
break;
}
}
}
return currentSize;
}

/** Empty {@link StringSetData}, representing no values reported and is immutable. */
public static class EmptyStringSetData extends StringSetData {

Expand All @@ -91,6 +156,11 @@ public Set<String> stringSet() {
return ImmutableSet.of();
}

@Override
public long stringSize() {
return 0L;
}

/** Return a {@link StringSetResult#empty()} which is immutable empty set. */
@Override
public StringSetResult extractResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertTrue;

import java.util.Collections;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -81,6 +82,14 @@ public void testStringSetDataEmptyIsImmutable() {
assertThrows(UnsupportedOperationException.class, () -> empty.stringSet().add("aa"));
}

@Test
public void testStringSetDataEmptyCanAdd() {
ImmutableSet<String> contents = ImmutableSet.of("ab", "cd");
StringSetData stringSetData = StringSetData.empty();
stringSetData = stringSetData.addAll(contents.toArray(new String[] {}));
assertEquals(stringSetData.stringSet(), contents);
}

@Test
public void testEmptyExtract() {
assertTrue(StringSetData.empty().extractResult().getStringSet().isEmpty());
Expand All @@ -94,9 +103,26 @@ public void testExtract() {
}

@Test
public void testExtractReturnsImmutable() {
StringSetData stringSetData = StringSetData.create(ImmutableSet.of("ab", "cd"));
// check that immutable copy is returned
assertThrows(UnsupportedOperationException.class, () -> stringSetData.stringSet().add("aa"));
public void testStringSetAddUntilCapacity() {
StringSetData combined = StringSetData.empty();
@SuppressWarnings("InlineMeInliner") // Inline representation is Java11+ only
String commonPrefix = Strings.repeat("*", 1000);
long stringSize = 0;
for (int i = 0; i < 1000; ++i) {
String s = commonPrefix + i;
stringSize += s.length();
combined = combined.addAll(s);
}
assertTrue(combined.stringSize() < stringSize);
assertTrue(combined.stringSize() > StringSetData.STRING_SET_SIZE_LIMIT);
}

@Test
public void testStringSetAddSizeTrackedCorrectly() {
StringSetData combined = StringSetData.empty();
combined = combined.addAll("a", "b", "c", "b");
assertEquals(3, combined.stringSize());
combined = combined.addAll("c", "d", "e");
assertEquals(5, combined.stringSize());
}
}
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/metrics/cells.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ cdef class GaugeCell(MetricCell):


cdef class StringSetCell(MetricCell):
cdef readonly set data
cdef readonly object data

cdef inline bint _update(self, value) except -1

Expand Down
Loading

0 comments on commit b4af134

Please sign in to comment.