From bf6fed38ecab7d9d603d2e06113221bd31f4a355 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 9 Sep 2016 12:38:37 -0700 Subject: [PATCH] Revert PR-427 to re-enable streaming bounded read --- .../sdk/runners/DataflowPipelineRunner.java | 10 +------ .../runners/DataflowPipelineRunnerTest.java | 29 ------------------- 2 files changed, 1 insertion(+), 38 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index d77fc863dc..d54927dddd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -361,15 +361,7 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) { builder.put(View.AsList.class, StreamingViewAsList.class); builder.put(View.AsIterable.class, StreamingViewAsIterable.class); builder.put(Read.Unbounded.class, StreamingUnboundedRead.class); - if (options.getExperiments() == null - || !options.getExperiments().contains("enable_streaming_bounded_read")) { - builder.put(Read.Bounded.class, UnsupportedIO.class); - builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class); - builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class); - builder.put(TextIO.Read.Bound.class, UnsupportedIO.class); - } else { - builder.put(Read.Bounded.class, StreamingBoundedRead.class); - } + builder.put(Read.Bounded.class, StreamingBoundedRead.class); builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); // In streaming mode must use either the custom Pubsub unbounded source/sink or diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java index 7ad99dbef7..1dac6c4cf2 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java @@ -46,8 +46,6 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.VarLongCoder; import com.google.cloud.dataflow.sdk.io.AvroIO; -import com.google.cloud.dataflow.sdk.io.AvroSource; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; @@ -930,33 +928,6 @@ private void testUnsupportedSource(PTransform source, String name, bo p.run(); } - @Test - public void testBoundedSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true); - } - - @Test - public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true); - } - - @Test - public void testAvroIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(AvroIO.Read.from("foo"), "AvroIO.Read", true); - } - - @Test - public void testTextIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true); - } - - @Test - public void testReadBoundedSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true); - } - @Test public void testReadUnboundedUnsupportedInBatch() throws Exception { testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);