Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Move GroupByKey validation into validate method
Browse files Browse the repository at this point in the history
----Release Notes----
Re-enabled verification of GroupByKey usage. Specififically, the key
must have a deterministic coder and using GroupByKey with an Unbounded
PCollection requires windowing or triggers.

[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=99194010
  • Loading branch information
bchambers authored and davorbonaci committed Jul 27, 2015
1 parent ccd4e0a commit 39ba987
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -188,7 +189,7 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
return new DataflowPipelineRunner(dataflowOptions);
}

private DataflowPipelineRunner(DataflowPipelineOptions options) {
@VisibleForTesting protected DataflowPipelineRunner(DataflowPipelineOptions options) {
this.options = options;
this.dataflowClient = options.getDataflowClient();
this.translator = DataflowPipelineTranslator.fromOptions(options);
Expand Down Expand Up @@ -224,7 +225,9 @@ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
@SuppressWarnings("unchecked")
OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal(
pc.getPipeline(),
pc.getWindowingStrategy(),
transform instanceof GroupByKey
? ((GroupByKey<?, ?>) transform).updateWindowingStrategy(pc.getWindowingStrategy())
: pc.getWindowingStrategy(),
pc.isBounded());
return outputT;

Expand Down Expand Up @@ -662,7 +665,7 @@ public PCollection<T> apply(PInput input) {
.setWindowingStrategyInternal(WindowingStrategy.globalDefault())
.apply(Window.<KV<Void, Iterable<Void>>>into(new GlobalWindows()))
.apply(ParDo.of(new OutputElements<>(transform.getElements(), coder)))
.setCoder(coder);
.setCoder(coder).setIsBoundedInternal(IsBounded.BOUNDED);
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
+ "Please set a coder by invoking Create.withCoder() explicitly.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,60 @@ public boolean fewKeys() {
/////////////////////////////////////////////////////////////////////////////

@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
// This operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
public void validate(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
// Verify that the input PCollection is bounded, or that there is windowing/triggering being
// used. Without this, the watermark (at end of global window) will never be reached.
if (windowingStrategy.getWindowFn() instanceof GlobalWindows
&& windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
&& input.isBounded() != IsBounded.BOUNDED) {
throw new IllegalStateException("GroupByKey cannot be applied to non-bounded PCollection in "
+ "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "
+ "prior to GroupByKey.");
}

// Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
// the key coder is deterministic.
Coder<K> keyCoder = getKeyCoder(input.getCoder());
try {
keyCoder.verifyDeterministic();
} catch (NonDeterministicException e) {
throw new IllegalStateException(
"the keyCoder of a GroupByKey must be deterministic", e);
}

// Validate the window merge function.
if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
String cause = ((InvalidWindows<?>) windowingStrategy.getWindowFn()).getCause();
throw new IllegalStateException(
"GroupByKey must have a valid Window merge function. "
+ "Invalid because: " + cause);
}
}

public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
if (!inputWindowFn.isNonMerging()) {
// Prevent merging windows again, without explicit user
// involvement, e.g., by Window.into() or Window.remerge().
inputWindowFn = new InvalidWindows<>(
"WindowFn has already been consumed by previous GroupByKey", inputWindowFn);
}

// We also switch to the continuation trigger associated with the current trigger.
return inputStrategy
.withWindowFn(inputWindowFn)
.withTrigger(inputStrategy.getTrigger().getSpec().getContinuationTrigger());
}

@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
// This operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();

// By default, implement GroupByKey[AndWindow] via a series of lower-level
// operations.
return input
Expand All @@ -206,7 +248,10 @@ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
.apply(new SortValuesByTimestamp<K, V>())

// Group each key's values by window, merging windows as needed.
.apply(new GroupAlsoByWindow<K, V>(windowingStrategy));
.apply(new GroupAlsoByWindow<K, V>(windowingStrategy))

// And update the windowing strategy as appropriate.
.setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy));
}

@Override
Expand Down Expand Up @@ -383,46 +428,12 @@ public PCollection<KV<K, Iterable<V>>> apply(
public static class GroupByKeyOnly<K, V>
extends PTransform<PCollection<KV<K, V>>,
PCollection<KV<K, Iterable<V>>>> {
@Override
public void validate(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
if (windowingStrategy.getWindowFn() instanceof GlobalWindows
&& windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
&& input.isBounded() != IsBounded.BOUNDED) {
throw new IllegalStateException("Non-bounded PCollection cannot be "
+ "processed with GlobalWindow and DefaultTrigger for GroupByKey."
+ "Use Window.into transform prior to GroupByKey.");
}
// Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
// the key coder is deterministic.
Coder<K> keyCoder = getKeyCoder(input.getCoder());
try {
keyCoder.verifyDeterministic();
} catch (NonDeterministicException e) {
throw new IllegalStateException(
"the keyCoder of a GroupByKey must be deterministic", e);
}
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> oldWindowingStrategy = input.getWindowingStrategy();
WindowFn<?, ?> newWindowFn = oldWindowingStrategy.getWindowFn();
if (!newWindowFn.isNonMerging()) {
// Prevent merging windows again, without explicit user
// involvement, e.g., by Window.into() or Window.remerge().
newWindowFn = new InvalidWindows(
"WindowFn has already been consumed by previous GroupByKey", newWindowFn);
}

// We also switch to the continuation trigger associated with the current trigger.
WindowingStrategy<?, ?> newWindowingStrategy = oldWindowingStrategy
.withWindowFn(newWindowFn)
.withTrigger(oldWindowingStrategy.getTrigger().getSpec().getContinuationTrigger());

return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
input.getPipeline(), newWindowingStrategy, input.isBounded());
input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ public IsBounded isBounded() {
return isBounded;
}


/////////////////////////////////////////////////////////////////////////////
// Internal details below here.

Expand Down Expand Up @@ -223,8 +222,10 @@ public PCollection<T> setWindowingStrategyInternal(WindowingStrategy<?, ?> windo

/**
* Sets the {@link PCollection.IsBounded} of this {@code PCollection}.
*
* <p> For use by internal transformations only.
*/
private PCollection<T> setIsBoundedInternal(IsBounded isBounded) {
public PCollection<T> setIsBoundedInternal(IsBounded isBounded) {
this.isBounded = isBounded;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,22 @@
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;

import org.joda.time.Duration;
Expand All @@ -57,7 +65,7 @@
public class GroupByKeyTest {

@Rule
public ExpectedException expectedEx = ExpectedException.none();
public ExpectedException thrown = ExpectedException.none();

@Test
@Category(RunnableOnService.class)
Expand Down Expand Up @@ -167,8 +175,6 @@ public void testGroupByKeyEmpty() {

@Test
public void testGroupByKeyNonDeterministic() throws Exception {
expectedEx.expect(IllegalStateException.class);
expectedEx.expectMessage("must be deterministic");

List<KV<Map<String, String>, Integer>> ungroupedPairs = Arrays.asList();

Expand All @@ -180,9 +186,9 @@ public void testGroupByKeyNonDeterministic() throws Exception {
KvCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
BigEndianIntegerCoder.of())));

thrown.expect(IllegalStateException.class);
thrown.expectMessage("must be deterministic");
input.apply(GroupByKey.<Map<String, String>, Integer>create());

p.run();
}

@Test
Expand Down Expand Up @@ -230,9 +236,30 @@ public void testWindowFnInvalidation() {
Duration.standardMinutes(1)))));
}

/**
* Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
* is not expanded. This is used for verifying that even without expansion the proper errors show
* up.
*/
private Pipeline createTestServiceRunner() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject("someproject");
options.setStagingLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
options.setDataflowClient(null);
return Pipeline.create(options);
}

private Pipeline createTestDirectRunner() {
DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
options.setRunner(DirectPipelineRunner.class);
return Pipeline.create(options);
}

@Test
public void testInvalidWindows() {
Pipeline p = TestPipeline.create();
public void testInvalidWindowsDirect() {
Pipeline p = createTestDirectRunner();

List<KV<String, Integer>> ungroupedPairs = Arrays.asList();

Expand All @@ -242,15 +269,30 @@ public void testInvalidWindows() {
.apply(Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(1))));

try {
input
.apply("GroupByKey", GroupByKey.<String, Integer>create())
.apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
Assert.fail("Exception should have been thrown");
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().startsWith(
"GroupByKey must have a valid Window merge function."));
}
thrown.expect(IllegalStateException.class);
thrown.expectMessage("GroupByKey must have a valid Window merge function");
input
.apply("GroupByKey", GroupByKey.<String, Integer>create())
.apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
}

@Test
public void testInvalidWindowsService() {
Pipeline p = createTestServiceRunner();

List<KV<String, Integer>> ungroupedPairs = Arrays.asList();

PCollection<KV<String, Integer>> input =
p.apply(Create.of(ungroupedPairs)
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(1))));

thrown.expect(IllegalStateException.class);
thrown.expectMessage("GroupByKey must have a valid Window merge function");
input
.apply("GroupByKey", GroupByKey.<String, Integer>create())
.apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
}

@Test
Expand Down Expand Up @@ -278,6 +320,48 @@ public void testRemerge() {
Sessions.withGapDuration(Duration.standardMinutes(1))));
}

@Test
public void testGroupByKeyDirectUnbounded() {
Pipeline p = createTestDirectRunner();

PCollection<KV<String, Integer>> input = p
.apply(new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> apply(PBegin input) {
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
}
});

thrown.expect(IllegalStateException.class);
thrown.expectMessage(
"GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+ "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");

input.apply("GroupByKey", GroupByKey.<String, Integer>create());
}

@Test
public void testGroupByKeyServiceUnbounded() {
Pipeline p = createTestServiceRunner();

PCollection<KV<String, Integer>> input = p
.apply(new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> apply(PBegin input) {
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
}
});

thrown.expect(IllegalStateException.class);
thrown.expectMessage(
"GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+ "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");

input.apply("GroupByKey", GroupByKey.<String, Integer>create());
}

@Test
public void testGroupByKeyGetName() {
Assert.assertEquals("GroupByKey", GroupByKey.<String, Integer>create().getName());
Expand Down

0 comments on commit 39ba987

Please sign in to comment.