Skip to content

Commit

Permalink
Merge pull request apache#7 from seznam/vasek/more-operator-tests
Browse files Browse the repository at this point in the history
[BEAM-4529]  Enable tests in `OperatorsTestSuite` where possible
  • Loading branch information
VaclavPlajt authored Jun 15, 2018
2 parents 75edc06 + 24710b1 commit 4e4f988
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package org.apache.beam.sdk.extensions.euphoria.beam;

import java.time.Duration;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.CountByKeyTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.DistinctTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.FilterTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.FlatMapTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.JoinTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.MapElementsTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.ReduceByKeyTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.SumByKeyTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.UnionTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.ExecutorEnvironment;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.ExecutorProvider;
Expand All @@ -36,22 +41,19 @@
*/
@RunWith(ExecutorProviderRunner.class)
@Suite.SuiteClasses({
// BroadcastHashJoinTest.class,
// CountByKeyTest.class,
// DistinctTest.class,
// FilterTest.class,
// BroadcastHashJoinTest.class,
CountByKeyTest.class,
DistinctTest.class,
FilterTest.class,
FlatMapTest.class,
JoinTest.class,
// JoinWindowEnforcementTest.class,
// MapElementsTest.class,
MapElementsTest.class,
ReduceByKeyTest.class,
// ReduceStateByKeyTest.class,
// SumByKeyTest.class,
// TopPerKeyTest.class,
// SortTest.class,
SumByKeyTest.class,
// TopPerKeyTest.class, - uncomment when ReduceStateByKey is supported
UnionTest.class,
// WindowingTest.class,
// WatermarkTest.class,
// WindowingTest.class,
})
public class OperatorsTestSuite implements ExecutorProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
public class DistinctTest extends AbstractOperatorTest {

/** Test simple duplicates. */
// Distinct operator with unbounded dataset without windowing do not work,
// since it is translated into GroupByKey."
@Processing(Processing.Type.BOUNDED)
@Test
public void testSimpleDuplicatesWithNoWindowing() {
execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.SumByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
Expand All @@ -38,10 +39,13 @@ public void testSumByKey() {
new AbstractTestCase<Integer, Pair<Integer, Long>>() {
@Override
protected Dataset<Pair<Integer, Long>> getOutput(Dataset<Integer> input) {
return SumByKey.of(input)

Dataset<Integer> inputWithTime = AssignEventTime.of(input).using(i -> 0).output();

return SumByKey.of(inputWithTime)
.keyBy(e -> e % 2)
.valueBy(e -> (long) e)
.windowBy(FixedWindows.of(org.joda.time.Duration.standardSeconds(1)))
.windowBy(FixedWindows.of(org.joda.time.Duration.standardHours(1)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.output();
Expand Down

This file was deleted.

0 comments on commit 4e4f988

Please sign in to comment.