Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Occasionally compact combined state on write.
Browse files Browse the repository at this point in the history
This is needed to avoid arbitrarily large blowup
for many-element windows.  Now, with a fixed
probability (currently 0.2%) when writing the state,
instead of doing a blind write we read in all
accumulated values, combine, and replace everything
with the final combined value.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=109239045
  • Loading branch information
robertwb authored and davorbonaci committed Dec 3, 2015
1 parent d81e750 commit 0c637d3
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder;
import com.google.cloud.dataflow.sdk.util.state.ValueState;
import com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
Expand All @@ -45,6 +46,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -102,6 +104,35 @@ public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> cod
private final boolean useStateFamilies;
private final Supplier<StateSampler.ScopedState> scopedReadStateSupplier;

@VisibleForTesting
static final ThreadLocal<Supplier<Boolean>> COMPACT_NOW =
new ThreadLocal() {
public Supplier<Boolean> initialValue() {
return new Supplier<Boolean>() {
/* The rate at which, on average, this will return true. */
static final double RATE = 0.002;
Random random = new Random();
long counter = nextSample();

private long nextSample() {
// Use geometric distribution to find next true value.
// This lets us avoid invoking random.nextDouble() on every call.
return (long) Math.floor(Math.log(random.nextDouble()) / Math.log(1 - RATE));
}

public Boolean get() {
counter--;
if (counter < 0) {
counter = nextSample();
return true;
} else {
return false;
}
}
};
}
};

public WindmillStateInternals(String prefix, boolean useStateFamilies,
WindmillStateReader reader, Supplier<StateSampler.ScopedState> scopedReadStateSupplier) {
this.prefix = prefix;
Expand Down Expand Up @@ -569,6 +600,11 @@ public void clear() {
@Override
public void persist(Windmill.WorkItemCommitRequest.Builder commitBuilder) throws IOException {
if (hasLocalAdditions) {
// TODO: Take into account whether it's in the cache.
if (COMPACT_NOW.get().get()) {
// Implicitly clears the bag and combines local and persisted accumulators.
localAdditionsAccum = getAccum().read();
}
bag.add(combineFn.compact(localAdditionsAccum));
localAdditionsAccum = combineFn.createAccumulator();
hasLocalAdditions = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.dataflow.sdk.util.state.ValueState;
import com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class WindmillStateInternalsTest {
private static final StateTag<CombiningValueState<Integer, Integer>> COMBINING_ADDR =
StateTags.combiningValueFromInputInternal(
"combining", VarIntCoder.of(), new Sum.SumIntegerFn());
private static final ByteString COMBINING_KEY = key(NAMESPACE, "combining");
private final Coder<int[]> accumCoder =
new Sum.SumIntegerFn().getAccumulatorCoder(null, VarIntCoder.of());

Expand All @@ -81,11 +83,11 @@ public class WindmillStateInternalsTest {
@Mock
private Supplier<StateSampler.ScopedState> readStateSupplier;

private ByteString key(StateNamespace namespace, String addrId) {
private static ByteString key(StateNamespace namespace, String addrId) {
return key("", namespace, addrId);
}

private ByteString key(String prefix, StateNamespace namespace, String addrId) {
private static ByteString key(String prefix, StateNamespace namespace, String addrId) {
return ByteString.copyFromUtf8(prefix + namespace.stringKey() + "+u" + addrId);
}

Expand Down Expand Up @@ -286,7 +288,7 @@ public void testCombiningAddBeforeRead() throws Exception {
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);

SettableFuture<Iterable<int[]>> future = SettableFuture.create();
when(mockReader.listFuture(key(NAMESPACE, "combining"), STATE_FAMILY, accumCoder))
when(mockReader.listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder))
.thenReturn(future);

StateContents<Integer> result = value.get();
Expand Down Expand Up @@ -326,10 +328,10 @@ public void testCombiningIsEmpty() throws Exception {
CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);

SettableFuture<Iterable<int[]>> future = SettableFuture.create();
when(mockReader.listFuture(key(NAMESPACE, "combining"), STATE_FAMILY, accumCoder))
when(mockReader.listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder))
.thenReturn(future);
StateContents<Boolean> result = value.isEmpty();
Mockito.verify(mockReader).listFuture(key(NAMESPACE, "combining"), STATE_FAMILY, accumCoder);
Mockito.verify(mockReader).listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder);

waitAndSet(future, Arrays.asList(new int[] {29}), 200);
assertThat(result.read(), Matchers.is(false));
Expand All @@ -342,7 +344,7 @@ public void testCombiningIsEmptyAfterClear() throws Exception {
value.clear();
StateContents<Boolean> result = value.isEmpty();
Mockito.verify(mockReader, never())
.listFuture(key(NAMESPACE, "combining"), STATE_FAMILY, accumCoder);
.listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder);
assertThat(result.read(), Matchers.is(true));

value.add(87);
Expand All @@ -351,6 +353,8 @@ public void testCombiningIsEmptyAfterClear() throws Exception {

@Test
public void testCombiningAddPersist() throws Exception {
disableCompactOnWrite();

CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);

value.add(5);
Expand All @@ -363,7 +367,7 @@ public void testCombiningAddPersist() throws Exception {
assertEquals(1, commitBuilder.getListUpdatesCount());

TagList listUpdates = commitBuilder.getListUpdates(0);
assertEquals(key(NAMESPACE, "combining"), listUpdates.getTag());
assertEquals(COMBINING_KEY, listUpdates.getTag());
assertEquals(1, listUpdates.getValuesCount());
assertEquals(
11,
Expand All @@ -375,8 +379,45 @@ public void testCombiningAddPersist() throws Exception {
Mockito.verifyNoMoreInteractions(mockReader);
}

@Test
public void testCombiningAddPersistWithCompact() throws Exception {
forceCompactOnWrite();

Mockito.stub(
mockReader.listFuture(
org.mockito.Matchers.<ByteString>any(),
org.mockito.Matchers.<String>any(),
org.mockito.Matchers.<Coder<int[]>>any()))
.toReturn(
Futures.<Iterable<int[]>>immediateFuture(
ImmutableList.of(new int[] {40}, new int[] {60})));

CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);

value.add(5);
value.add(6);

Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);

assertEquals(2, commitBuilder.getListUpdatesCount());
assertEquals(0, commitBuilder.getListUpdates(0).getValuesCount());

TagList listUpdates = commitBuilder.getListUpdates(1);
assertEquals(COMBINING_KEY, listUpdates.getTag());
assertEquals(1, listUpdates.getValuesCount());
assertEquals(
111,
CoderUtils.decodeFromByteArray(
accumCoder, listUpdates.getValues(0).getData().substring(1).toByteArray())[
0]);
}

@Test
public void testCombiningClearPersist() throws Exception {
disableCompactOnWrite();

CombiningValueState<Integer, Integer> value = underTest.state(NAMESPACE, COMBINING_ADDR);

value.clear();
Expand All @@ -390,20 +431,20 @@ public void testCombiningClearPersist() throws Exception {
assertEquals(2, commitBuilder.getListUpdatesCount());

TagList listClear = commitBuilder.getListUpdates(0);
assertEquals(key(NAMESPACE, "combining"), listClear.getTag());
assertEquals(COMBINING_KEY, listClear.getTag());
assertEquals(Long.MAX_VALUE, listClear.getEndTimestamp());
assertEquals(0, listClear.getValuesCount());

TagList listUpdates = commitBuilder.getListUpdates(1);
assertEquals(key(NAMESPACE, "combining"), listUpdates.getTag());
assertEquals(COMBINING_KEY, listUpdates.getTag());
assertEquals(1, listUpdates.getValuesCount());
assertEquals(
11,
CoderUtils.decodeFromByteArray(
accumCoder, listUpdates.getValues(0).getData().substring(1).toByteArray())[0]);

// Blind adds should not need to read the future.
Mockito.verify(mockReader).listFuture(key(NAMESPACE, "combining"), STATE_FAMILY, accumCoder);
Mockito.verify(mockReader).listFuture(COMBINING_KEY, STATE_FAMILY, accumCoder);
Mockito.verify(mockReader).startBatchAndBlock();
Mockito.verifyNoMoreInteractions(mockReader);
}
Expand Down Expand Up @@ -866,4 +907,22 @@ public void testValueNoStateFamilies() throws Exception {

assertEquals("World", value.get().read());
}

private void disableCompactOnWrite() {
WindmillStateInternals.COMPACT_NOW.set(
new Supplier<Boolean>() {
public Boolean get() {
return false;
}
});
}

private void forceCompactOnWrite() {
WindmillStateInternals.COMPACT_NOW.set(
new Supplier<Boolean>() {
public Boolean get() {
return true;
}
});
}
}

0 comments on commit 0c637d3

Please sign in to comment.