diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 6697fcec2439..12ed3603264a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -105,9 +105,11 @@ public void translate(Pipeline pipeline) { if (options.isStreaming() || options.getUseDataStreamForBatch()) { this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) { - LOG.warn("UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); + LOG.warn( + "UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); } - translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming()); + translator = + new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming()); if (!options.isStreaming()) { flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 0d2142ce4397..f0514c69891b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -32,7 +32,11 @@ * requiring flink on the classpath (e.g. to use with the direct runner). */ public interface FlinkPipelineOptions - extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions, VersionDependentFlinkPipelineOptions { + extends PipelineOptions, + ApplicationNameOptions, + StreamingOptions, + FileStagingOptions, + VersionDependentFlinkPipelineOptions { String AUTO = "[auto]"; String PIPELINED = "PIPELINED"; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 62a47572c870..ffc7da97cd02 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -82,9 +82,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { private int depth = 0; public FlinkStreamingPipelineTranslator( - StreamExecutionEnvironment env, - PipelineOptions options, - boolean isStreaming) { + StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { this.streamingContext = new FlinkStreamingTranslationContext(env, options, isStreaming); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 4e94d6957a63..f3901fde03ba 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -223,12 +223,17 @@ public void translateNode( ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - FlinkUnboundedSource unboundedSource = FlinkSource.unbounded(transform.getName(), - rawSource, new SerializablePipelineOptions(context.getPipelineOptions()), parallelism); + FlinkUnboundedSource unboundedSource = + FlinkSource.unbounded( + transform.getName(), + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); nonDedupSource = context .getExecutionEnvironment() - .fromSource(unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) + .fromSource( + unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) .uid(fullName); if (rawSource.requiresDeduping()) { @@ -339,7 +344,8 @@ private static class ReadSourceTranslator @Override void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) == PCollection.IsBounded.BOUNDED) { + if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) + == PCollection.IsBounded.BOUNDED) { boundedTranslator.translateNode(transform, context); } else { unboundedTranslator.translateNode(transform, context); @@ -375,18 +381,20 @@ public void translateNode( ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - FlinkBoundedSource flinkBoundedSource = FlinkSource.bounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); + FlinkBoundedSource flinkBoundedSource = + FlinkSource.bounded( + transform.getName(), + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); DataStream> source; try { source = context .getExecutionEnvironment() - .fromSource(flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) + .fromSource( + flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) .uid(fullName); } catch (Exception e) { throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); @@ -554,9 +562,9 @@ static void translateParDo( KeySelector, ?> keySelector = null; boolean stateful = false; DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (!signature.stateDeclarations().isEmpty() || - !signature.timerDeclarations().isEmpty() || - !signature.timerFamilyDeclarations().isEmpty()) { + if (!signature.stateDeclarations().isEmpty() + || !signature.timerDeclarations().isEmpty() + || !signature.timerFamilyDeclarations().isEmpty()) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index a50c5d61d420..0a89bd18172b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -64,9 +64,7 @@ class FlinkStreamingTranslationContext { private AppliedPTransform currentTransform; public FlinkStreamingTranslationContext( - StreamExecutionEnvironment env, - PipelineOptions options, - boolean isStreaming) { + StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { this.env = checkNotNull(env); this.options = checkNotNull(options); this.isStreaming = isStreaming; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java index 05b5ef41645c..48ee15501156 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java @@ -14,20 +14,18 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ - package org.apache.beam.runners.flink; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; - public interface VersionDependentFlinkPipelineOptions extends PipelineOptions { - @Description("When set to true, the batch job execution will use DataStream API. " - + "Otherwise, the batch job execution will use the legacy DataSet API.") + @Description( + "When set to true, the batch job execution will use DataStream API. " + + "Otherwise, the batch job execution will use the legacy DataSet API.") @Default.Boolean(false) Boolean getUseDataStreamForBatch(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 28bbd4481031..63f5ede00242 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -1639,12 +1639,14 @@ public Instant currentInputWatermarkTime() { // or long MAX_VALUE. So we should just use the Flink time service watermark in batch mode. // // In Flink the watermark ranges from - // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the beam + // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the + // beam // watermark range is [BoundedWindow.TIMESTAMP_MIN_VALUE (-9223372036854775), // BoundedWindow.TIMESTAMP_MAX_VALUE (9223372036854775)]. To ensure the timestamp visible to // the users follow the Beam convention, we just use the Beam range instead. - return timerService.currentWatermark() == Long.MAX_VALUE ? - new Instant(Long.MAX_VALUE) : BoundedWindow.TIMESTAMP_MIN_VALUE; + return timerService.currentWatermark() == Long.MAX_VALUE + ? new Instant(Long.MAX_VALUE) + : BoundedWindow.TIMESTAMP_MIN_VALUE; } else { return new Instant(getEffectiveInputWatermark()); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 6654d9f59cf8..ec44d279586d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -62,14 +62,11 @@ public class FlinkExecutionEnvironmentsTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); - @Parameterized.Parameter - public boolean useDataStreamForBatch; + @Parameterized.Parameter public boolean useDataStreamForBatch; @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") public static Collection useDataStreamForBatchJobValues() { - return Arrays.asList(new Object[][] { - {false}, {true} - }); + return Arrays.asList(new Object[][] {{false}, {true}}); } private FlinkPipelineOptions getDefaultPipelineOptions() { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java index 6733a5976e1c..676e35d4bc0f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java @@ -85,14 +85,11 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable { @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); - @Parameterized.Parameter - public boolean useDataStreamForBatch; + @Parameterized.Parameter public boolean useDataStreamForBatch; @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") public static Collection useDataStreamForBatchJobValues() { - return Arrays.asList(new Object[][] { - {false}, {true} - }); + return Arrays.asList(new Object[][] {{false}, {true}}); } private FlinkPipelineOptions getDefaultPipelineOptions() { @@ -193,7 +190,8 @@ public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception { shouldUsePreparedFilesOnRemoteStreamEnvironment(false); } - public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) throws Exception { + public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) + throws Exception { FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); @@ -206,7 +204,8 @@ public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMod List jarFiles; if (streamingMode || options.getUseDataStreamForBatch()) { - StreamExecutionEnvironment streamExecutionEnvironment = flinkEnv.getStreamExecutionEnvironment(); + StreamExecutionEnvironment streamExecutionEnvironment = + flinkEnv.getStreamExecutionEnvironment(); assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); jarFiles = getJars(streamExecutionEnvironment); } else { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 10c570ceddf5..d5d34b59214b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -76,7 +76,8 @@ public void readSourceTranslatorBoundedWithMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - FlinkBoundedSource source = (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); + FlinkBoundedSource source = + (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); assertEquals(maxParallelism, source.getNumSplits()); } @@ -94,7 +95,8 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - FlinkBoundedSource source = (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); + FlinkBoundedSource source = + (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); assertEquals(parallelism, source.getNumSplits()); } @@ -117,7 +119,8 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() { FlinkSource source = (FlinkSource) - ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource(); + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getSource(); assertEquals(maxParallelism, source.getNumSplits()); } @@ -138,7 +141,8 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { FlinkSource source = (FlinkSource) - ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource(); + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getSource(); assertEquals(parallelism, source.getNumSplits()); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java index 21fea7366da9..b8dc52f6cd4b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java @@ -67,7 +67,8 @@ public void testBatch() { private static void runProgram(String resultPath, boolean streaming) { - Pipeline p = streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch(); + Pipeline p = + streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch(); p.apply(GenerateSequence.from(0).to(10)) .apply(