diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index febd96d313a5..27abb0ed0ff1 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -264,7 +264,13 @@
org.hamcrest
hamcrest-all
- test
+ ${hamcrest.version}
+
+
+
+ org.hamcrest
+ hamcrest-library
+ ${hamcrest.version}
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
index a23f82b0654e..f265e0d96e9c 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.integration.nexmark;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItems;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -28,16 +33,23 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.hamcrest.core.IsCollectionContaining;
+import org.hamcrest.core.IsEqual;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
/**
- * Base class for models of the eight NEXMark queries. Provides an assertion
- * function which can be applied against the actual query results to check their consistency
- * with the model.
+ * Base class for models of the eight NEXMark queries. Provides an assertion function which can be
+ * applied against the actual query results to check their consistency with the model.
*/
public abstract class NexmarkQueryModel implements Serializable {
+ protected final NexmarkConfiguration configuration;
+
+ public NexmarkQueryModel(NexmarkConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
/**
* Return the start of the most recent window of {@code size} and {@code period} which ends
* strictly before {@code timestamp}.
@@ -50,15 +62,7 @@ public static Instant windowStart(Duration size, Duration period, Instant timest
return new Instant(lim - s);
}
- protected final NexmarkConfiguration configuration;
-
- public NexmarkQueryModel(NexmarkConfiguration configuration) {
- this.configuration = configuration;
- }
-
- /**
- * Convert {@code itr} to strings capturing values, timestamps and order.
- */
+ /** Convert {@code itr} to strings capturing values, timestamps and order. */
protected static List toValueTimestampOrder(Iterator> itr) {
List strings = new ArrayList<>();
while (itr.hasNext()) {
@@ -67,9 +71,7 @@ protected static List toValueTimestampOrder(Iterator List toValueOrder(Iterator> itr) {
List strings = new ArrayList<>();
while (itr.hasNext()) {
@@ -78,9 +80,7 @@ protected static List toValueOrder(Iterator> itr
return strings;
}
- /**
- * Convert {@code itr} to strings capturing values only.
- */
+ /** Convert {@code itr} to strings capturing values only. */
protected static Set toValue(Iterator> itr) {
Set strings = new HashSet<>();
while (itr.hasNext()) {
@@ -99,22 +99,23 @@ protected Iterable> relevantResults(
}
/**
- * Convert iterator of elements to collection of strings to use when testing coherence
- * of model against actual query results.
+ * Convert iterator of elements to collection of strings to use when testing coherence of model
+ * against actual query results.
*/
protected abstract Collection toCollection(Iterator> itr);
- /**
- * Return assertion to use on results of pipeline for this query.
- */
+ /** Return assertion to use on results of pipeline for this query. */
public SerializableFunction>, Void> assertionFor() {
final Collection expectedStrings = toCollection(simulator().results());
+ final String[] expectedStringsArray = expectedStrings.toArray(new String[expectedStrings.size()]);
return new SerializableFunction>, Void>() {
@Override
public Void apply(Iterable> actual) {
Collection actualStrings = toCollection(relevantResults(actual).iterator());
- Assert.assertEquals(expectedStrings, actualStrings);
+ Assert.assertThat("wrong pipeline output", actualStrings, IsEqual.equalTo(expectedStrings));
+//compare without order
+// Assert.assertThat("wrong pipeline output", actualStrings, IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
return null;
}
};
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index 8f4cb22845aa..f7417d3c8513 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -382,8 +382,7 @@ public static Iterator> standardEventIterator(
*/
public static PTransform> batchEventsSource(
NexmarkConfiguration configuration) {
- return Read.from(new BoundedEventSource(
- NexmarkUtils.standardGeneratorConfig(configuration), configuration.numEventGenerators));
+ return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration), configuration.numEventGenerators));
}
/**
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
index b7cdf1cf861a..37e3f936e318 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
@@ -42,6 +42,7 @@ protected void run() {
return;
}
addResult(timestampedEvent);
+ //TODO test fails because offset of some hundreds of ms beween expect and actual
}
}
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
index ace6f7ead686..16287e68fb02 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
@@ -53,6 +53,7 @@ protected void run() {
TimestampedValue result =
TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
addResult(result);
+ //TODO test fails because offset of some hundreds of ms beween expect and actual
}
}
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
index 73e96e24672d..0033c68ce6dd 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
@@ -107,6 +107,7 @@ protected void run() {
}
// Keep only the highest bids.
captureBid(event.bid);
+ //TODO test fails because offset of some hundreds of ms between expect and actual
}
}
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
index fdd2a3522f54..261e383db671 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
@@ -115,7 +115,7 @@ public void run() {
// Remember auction for future new people.
newAuctions.put(event.newAuction.seller, event.newAuction);
}
- } else {
+ } else { // event is not an auction, nor a bid, so it is a person
// Join new person with existing auctions.
for (Auction auction : newAuctions.get(event.newPerson.id)) {
addResult(auction, event.newPerson, timestamp);
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
index 59705562e55b..dc8094b3c037 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
@@ -175,6 +175,7 @@ protected void run() {
return;
}
addResult(result);
+ //TODO test fails because offset of some hundreds of ms beween expect and actual
return;
}
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java
index d4d51f17c71f..e481eac67868 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/QueryTest.java
@@ -25,6 +25,7 @@
import org.apache.beam.sdk.values.TimestampedValue;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -33,23 +34,23 @@
* Test the various NEXMark queries yield results coherent with their models.
*/
@RunWith(JUnit4.class)
-@Ignore
-//TODO Ismael
public class QueryTest {
private static final NexmarkConfiguration CONFIG = NexmarkConfiguration.DEFAULT.clone();
+ @Rule
+ public TestPipeline p = TestPipeline.create();
static {
- CONFIG.numEvents = 2000;
+ //careful, results of tests are linked to numEvents value
+ CONFIG.numEvents = 100;
}
/** Test {@code query} matches {@code model}. */
- private static void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) {
- Pipeline p = TestPipeline.create();
+ private void queryMatchesModel(String name, NexmarkQuery query, NexmarkQueryModel model) {
NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
PCollection> results =
p.apply(name + ".ReadBounded", NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
//TODO Ismael this should not be called explicitly
-// results.setIsBoundedInternal(IsBounded.BOUNDED);
+ results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
PAssert.that(results).satisfies(model.assertionFor());
p.run().waitUntilFinish();
}