From 9dd922dca3a9e2e582087a47ef6a8cb12dafc966 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 13 Feb 2023 19:02:26 +0800 Subject: [PATCH 1/7] Add Flink PipelineOption of UseDataStreamForBatch. Modify the FlinkRunner to use DataStream API for batch job execution if UseDataStreamForBatch is set to true. --- .../AbstractStreamOperatorCompat.java | 16 ++++- .../AbstractStreamOperatorCompat.java | 16 ++++- runners/flink/flink_runner.gradle | 4 ++ .../FlinkPipelineExecutionEnvironment.java | 11 +-- .../runners/flink/FlinkPipelineOptions.java | 2 +- .../FlinkStreamingPipelineTranslator.java | 7 +- .../FlinkStreamingTransformTranslators.java | 70 +++++++++++-------- .../FlinkStreamingTranslationContext.java | 11 ++- .../flink/FlinkTransformOverrides.java | 2 +- .../VersionDependentFlinkPipelineOptions.java | 35 ++++++++++ .../wrappers/streaming/DoFnOperator.java | 35 +++++++++- .../streaming/SingletonKeyedWorkItem.java | 5 ++ 12 files changed, 166 insertions(+), 48 deletions(-) create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java index bb794e04398d..5072e6b2459f 100644 --- a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; /** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ public abstract class AbstractStreamOperatorCompat @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() { return getTimeServiceManager() .map( manager -> { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); + InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); + if (tsm instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); + return cast.numProcessingTimeTimers(); + } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", tsm)); + } }) .orElse(0); } diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java index 3b64612d6d19..d8740964fda9 100644 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; /** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ public abstract class AbstractStreamOperatorCompat @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() { return getTimeServiceManager() .map( manager -> { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); + InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); + if (tsm instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); + return cast.numProcessingTimeTimers(); + } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", tsm)); + } }) .orElse(0); } diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index b1a459337e51..635eff9d7ecf 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -314,6 +314,10 @@ def createValidatesRunnerTask(Map m) { excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate' // https://github.com/apache/beam/issues/20844 excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating' + if (!config.streaming) { + // FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing. + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew' + } } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 7961bea6069d..6697fcec2439 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.metrics.MetricsOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -101,13 +102,15 @@ public void translate(Pipeline pipeline) { prepareFilesToStageForRemoteClusterExecution(options); FlinkPipelineTranslator translator; - if (options.isStreaming()) { + if (options.isStreaming() || options.getUseDataStreamForBatch()) { this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) { - LOG.warn( - "UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); + LOG.warn("UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); + } + translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming()); + if (!options.isStreaming()) { + flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); } - translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options); } else { this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 650768c7b44b..0d2142ce4397 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -32,7 +32,7 @@ * requiring flink on the classpath (e.g. to use with the direct runner). */ public interface FlinkPipelineOptions - extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions { + extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions, VersionDependentFlinkPipelineOptions { String AUTO = "[auto]"; String PIPELINED = "PIPELINED"; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index e9f3f7fe9176..62a47572c870 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -81,8 +81,11 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { private int depth = 0; - public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) { - this.streamingContext = new FlinkStreamingTranslationContext(env, options); + public FlinkStreamingPipelineTranslator( + StreamExecutionEnvironment env, + PipelineOptions options, + boolean isStreaming) { + this.streamingContext = new FlinkStreamingTranslationContext(env, options, isStreaming); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 6d42d0c3b485..15bc0e0f71a1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -38,9 +38,7 @@ import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; -import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; @@ -54,6 +52,9 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -96,6 +97,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -220,16 +222,14 @@ public void translateNode( context.getExecutionEnvironment().getMaxParallelism() > 0 ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - UnboundedSourceWrapper sourceWrapper = - new UnboundedSourceWrapper<>( - fullName, context.getPipelineOptions(), rawSource, parallelism); + + FlinkUnboundedSource unboundedSource = FlinkSource.unbounded( + rawSource, new SerializablePipelineOptions(context.getPipelineOptions()), parallelism); nonDedupSource = context .getExecutionEnvironment() - .addSource(sourceWrapper) - .name(fullName) - .uid(fullName) - .returns(withIdTypeInfo); + .fromSource(unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) + .uid(fullName); if (rawSource.requiresDeduping()) { source = @@ -303,15 +303,24 @@ void translateNode(Impulse transform, FlinkStreamingTranslationContext context) WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE), context.getPipelineOptions()); - long shutdownAfterIdleSourcesMs = - context - .getPipelineOptions() - .as(FlinkPipelineOptions.class) - .getShutdownSourcesAfterIdleMs(); + FlinkBoundedSource impulseSource; + WatermarkStrategy> watermarkStrategy; + if (context.isStreaming()) { + long shutdownAfterIdleSourcesMs = + context + .getPipelineOptions() + .as(FlinkPipelineOptions.class) + .getShutdownSourcesAfterIdleMs(); + impulseSource = FlinkSource.unboundedImpulse(shutdownAfterIdleSourcesMs); + watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps(); + } else { + impulseSource = FlinkSource.boundedImpulse(); + watermarkStrategy = WatermarkStrategy.noWatermarks(); + } SingleOutputStreamOperator> source = context .getExecutionEnvironment() - .addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse") + .fromSource(impulseSource, watermarkStrategy, "Impulse") .returns(typeInfo); context.setOutputDataStream(context.getOutput(transform), source); @@ -330,7 +339,7 @@ private static class ReadSourceTranslator @Override void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED)) { + if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) == PCollection.IsBounded.BOUNDED) { boundedTranslator.translateNode(transform, context); } else { unboundedTranslator.translateNode(transform, context); @@ -361,24 +370,23 @@ public void translateNode( } String fullName = getCurrentTransformName(context); - UnboundedSource adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource); + int parallelism = + context.getExecutionEnvironment().getMaxParallelism() > 0 + ? context.getExecutionEnvironment().getMaxParallelism() + : context.getExecutionEnvironment().getParallelism(); + + FlinkBoundedSource flinkBoundedSource = FlinkSource.bounded( + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + DataStream> source; try { - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - UnboundedSourceWrapperNoValueWithRecordId sourceWrapper = - new UnboundedSourceWrapperNoValueWithRecordId<>( - new UnboundedSourceWrapper<>( - fullName, context.getPipelineOptions(), adaptedRawSource, parallelism)); source = context .getExecutionEnvironment() - .addSource(sourceWrapper) - .name(fullName) - .uid(fullName) - .returns(outputTypeInfo); + .fromSource(flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) + .uid(fullName); } catch (Exception e) { throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); } @@ -545,7 +553,9 @@ static void translateParDo( KeySelector, ?> keySelector = null; boolean stateful = false; DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { + if (!signature.stateDeclarations().isEmpty() || + !signature.timerDeclarations().isEmpty() || + !signature.timerFamilyDeclarations().isEmpty()) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index 9791eaeb4ac1..a50c5d61d420 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -51,6 +51,7 @@ class FlinkStreamingTranslationContext { private final StreamExecutionEnvironment env; private final PipelineOptions options; + private final boolean isStreaming; /** * Keeps a mapping between the output value of the PTransform and the Flink Operator that produced @@ -62,9 +63,13 @@ class FlinkStreamingTranslationContext { private AppliedPTransform currentTransform; - public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { + public FlinkStreamingTranslationContext( + StreamExecutionEnvironment env, + PipelineOptions options, + boolean isStreaming) { this.env = checkNotNull(env); this.options = checkNotNull(options); + this.isStreaming = isStreaming; } public StreamExecutionEnvironment getExecutionEnvironment() { @@ -75,6 +80,10 @@ public PipelineOptions getPipelineOptions() { return options; } + public boolean isStreaming() { + return isStreaming; + } + @SuppressWarnings("unchecked") public DataStream getInputDataStream(PValue value) { return (DataStream) dataStreams.get(value); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java index b53864d968c7..69ad58253b8e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java @@ -36,7 +36,7 @@ class FlinkTransformOverrides { static List getDefaultOverrides(FlinkPipelineOptions options) { ImmutableList.Builder builder = ImmutableList.builder(); - if (options.isStreaming()) { + if (options.isStreaming() || options.getUseDataStreamForBatch()) { builder .add( PTransformOverride.of( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java new file mode 100644 index 000000000000..05b5ef41645c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + + +public interface VersionDependentFlinkPipelineOptions extends PipelineOptions { + + @Description("When set to true, the batch job execution will use DataStream API. " + + "Otherwise, the batch job execution will use the legacy DataSet API.") + @Default.Boolean(false) + Boolean getUseDataStreamForBatch(); + + void setUseDataStreamForBatch(Boolean useDataStreamForBatch); +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 0a9731da8b56..28bbd4481031 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -110,10 +110,12 @@ import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -686,6 +688,7 @@ protected final void setBundleFinishedCallback(Runnable callback) { @Override public final void processElement(StreamRecord> streamRecord) { checkInvokeStartBundle(); + LOG.trace("Processing element {} in {}", streamRecord.getValue().getValue(), doFn.getClass()); long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L; doFnRunner.processElement(streamRecord.getValue()); checkInvokeFinishBundleByCount(); @@ -768,6 +771,7 @@ public final void processElement2(StreamRecord streamRecord) thro @Override public final void processWatermark(Watermark mark) throws Exception { + LOG.trace("Processing watermark {} in {}", mark.getTimestamp(), doFn.getClass()); processWatermark1(mark); } @@ -1456,8 +1460,10 @@ private void populateOutputTimestampQueue(InternalTimerService timerS BiConsumerWithException consumer = (timerData, stamp) -> keyedStateInternals.addWatermarkHoldUsage(timerData.getOutputTimestamp()); - timerService.forEachEventTimeTimer(consumer); - timerService.forEachProcessingTimeTimer(consumer); + if (timerService instanceof InternalTimerServiceImpl) { + timerService.forEachEventTimeTimer(consumer); + timerService.forEachProcessingTimeTimer(consumer); + } } private String constructTimerId(String timerFamilyId, String timerId) { @@ -1508,6 +1514,7 @@ public void setTimer(TimerData timer) { } private void registerTimer(TimerData timer, String contextTimerId) throws Exception { + LOG.debug("Registering timer {}", timer); pendingTimersById.put(contextTimerId, timer); long time = timer.getTimestamp().getMillis(); switch (timer.getDomain()) { @@ -1618,7 +1625,29 @@ public Instant currentProcessingTime() { @Override public Instant currentInputWatermarkTime() { - return new Instant(getEffectiveInputWatermark()); + if (timerService instanceof BatchExecutionInternalTimeService) { + // In batch mode, this method will only either return BoundedWindow.TIMESTAMP_MIN_VALUE, + // or BoundedWindow.TIMESTAMP_MAX_VALUE. + // + // For batch execution mode, the currentInputWatermark variable will never be updated + // until all the records are processed. However, every time when a record with a new + // key arrives, the Flink timer service watermark will be set to + // MAX_WATERMARK(LONG.MAX_VALUE) so that all the timers associated with the current + // key can fire. After that the Flink timer service watermark will be reset to + // LONG.MIN_VALUE, so the next key will start from a fresh env as if the previous + // records of a different key never existed. So the watermark is either Long.MIN_VALUE + // or long MAX_VALUE. So we should just use the Flink time service watermark in batch mode. + // + // In Flink the watermark ranges from + // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the beam + // watermark range is [BoundedWindow.TIMESTAMP_MIN_VALUE (-9223372036854775), + // BoundedWindow.TIMESTAMP_MAX_VALUE (9223372036854775)]. To ensure the timestamp visible to + // the users follow the Beam convention, we just use the Beam range instead. + return timerService.currentWatermark() == Long.MAX_VALUE ? + new Instant(Long.MAX_VALUE) : BoundedWindow.TIMESTAMP_MIN_VALUE; + } else { + return new Instant(getEffectiveInputWatermark()); + } } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java index c4d82cb5c8ad..6f2f473feddc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -51,4 +51,9 @@ public Iterable timersIterable() { public Iterable> elementsIterable() { return Collections.singletonList(value); } + + @Override + public String toString() { + return String.format("{%s, [%s]}", key, value); + } } From 04c24dc8acd091374d48b38aabffd5d1e05e69b6 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Feb 2023 10:28:05 +0800 Subject: [PATCH 2/7] Modify the unit tests and runner validation tests to cover the DataStream execution path of batch jobs. --- runners/flink/flink_runner.gradle | 7 ++ .../flink/FlinkExecutionEnvironmentsTest.java | 85 ++++++++++++------- ...FlinkPipelineExecutionEnvironmentTest.java | 82 ++++++++++-------- .../flink/FlinkPipelineOptionsTest.java | 1 + .../FlinkStreamingPipelineTranslatorTest.java | 4 +- ...linkStreamingTransformTranslatorsTest.java | 40 ++++----- .../runners/flink/FlinkSubmissionTest.java | 15 ++++ .../flink/ReadSourceStreamingTest.java | 13 ++- 8 files changed, 149 insertions(+), 98 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 635eff9d7ecf..ed6c11fbdab3 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -237,6 +237,7 @@ class ValidatesRunnerConfig { String name boolean streaming boolean checkpointing + boolean useDataStreamForBatch ArrayList sickbayTests } @@ -255,6 +256,7 @@ def createValidatesRunnerTask(Map m) { description = "Validates the ${runnerType} runner" def pipelineOptionsArray = ["--runner=TestFlinkRunner", "--streaming=${config.streaming}", + "--useDataStreamForBatch=${config.useDataStreamForBatch}", "--parallelism=2", ] if (config.checkpointing) { @@ -317,6 +319,9 @@ def createValidatesRunnerTask(Map m) { if (!config.streaming) { // FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing. excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew' + } else { + // https://github.com/apache/beam/issues/25485 + excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState' } } } @@ -324,6 +329,7 @@ def createValidatesRunnerTask(Map m) { } createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false, sickbayTests: sickbayTests) +createValidatesRunnerTask(name: "validatesRunnerBatchWithDataStream", streaming: false, useDataStreamForBatch: true, sickbayTests: sickbayTests) createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true, sickbayTests: sickbayTests) // We specifically have a variant which runs with checkpointing enabled for the // tests that require it since running a checkpoint variant is significantly @@ -336,6 +342,7 @@ tasks.register('validatesRunner') { group = 'Verification' description "Validates Flink runner" dependsOn validatesRunnerBatch + dependsOn validatesRunnerBatchWithDataStream dependsOn validatesRunnerStreaming dependsOn validatesRunnerStreamingCheckpointing } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 49d317d46ced..6654d9f59cf8 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -29,6 +29,8 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -49,17 +51,36 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.powermock.reflect.Whitebox; /** Tests for {@link FlinkExecutionEnvironments}. */ +@RunWith(Parameterized.class) public class FlinkExecutionEnvironmentsTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); + @Parameterized.Parameter + public boolean useDataStreamForBatch; + + @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") + public static Collection useDataStreamForBatchJobValues() { + return Arrays.asList(new Object[][] { + {false}, {true} + }); + } + + private FlinkPipelineOptions getDefaultPipelineOptions() { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); + return options; + } + @Test public void shouldSetParallelismBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setParallelism(42); @@ -71,7 +92,7 @@ public void shouldSetParallelismBatch() { @Test public void shouldSetParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setParallelism(42); @@ -84,7 +105,7 @@ public void shouldSetParallelismStreaming() { @Test public void shouldSetMaxParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setMaxParallelism(42); @@ -99,7 +120,7 @@ public void shouldSetMaxParallelismStreaming() { public void shouldInferParallelismFromEnvironmentBatch() throws IOException { String flinkConfDir = extractFlinkConfig(); - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -115,7 +136,7 @@ public void shouldInferParallelismFromEnvironmentBatch() throws IOException { public void shouldInferParallelismFromEnvironmentStreaming() throws IOException { String confDir = extractFlinkConfig(); - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -129,7 +150,7 @@ public void shouldInferParallelismFromEnvironmentStreaming() throws IOException @Test public void shouldFallbackToDefaultParallelismBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -141,7 +162,7 @@ public void shouldFallbackToDefaultParallelismBatch() { @Test public void shouldFallbackToDefaultParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -154,7 +175,7 @@ public void shouldFallbackToDefaultParallelismStreaming() { @Test public void useDefaultParallelismFromContextBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); @@ -166,7 +187,7 @@ public void useDefaultParallelismFromContextBatch() { @Test public void useDefaultParallelismFromContextStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); StreamExecutionEnvironment sev = @@ -179,7 +200,7 @@ public void useDefaultParallelismFromContextStreaming() { @Test public void shouldParsePortForRemoteEnvironmentBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:1234"); @@ -191,7 +212,7 @@ public void shouldParsePortForRemoteEnvironmentBatch() { @Test public void shouldParsePortForRemoteEnvironmentStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:1234"); @@ -204,7 +225,7 @@ public void shouldParsePortForRemoteEnvironmentStreaming() { @Test public void shouldAllowPortOmissionForRemoteEnvironmentBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host"); @@ -216,7 +237,7 @@ public void shouldAllowPortOmissionForRemoteEnvironmentBatch() { @Test public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host"); @@ -229,7 +250,7 @@ public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() { @Test public void shouldTreatAutoAndEmptyHostTheSameBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); ExecutionEnvironment sev = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); @@ -243,7 +264,7 @@ public void shouldTreatAutoAndEmptyHostTheSameBatch() { @Test public void shouldTreatAutoAndEmptyHostTheSameStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); StreamExecutionEnvironment sev = @@ -259,7 +280,7 @@ public void shouldTreatAutoAndEmptyHostTheSameStreaming() { @Test public void shouldDetectMalformedPortBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:p0rt"); @@ -271,7 +292,7 @@ public void shouldDetectMalformedPortBatch() { @Test public void shouldDetectMalformedPortStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:p0rt"); @@ -283,7 +304,7 @@ public void shouldDetectMalformedPortStreaming() { @Test public void shouldSupportIPv4Batch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("192.168.1.1:1234"); @@ -297,7 +318,7 @@ public void shouldSupportIPv4Batch() { @Test public void shouldSupportIPv4Streaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("192.168.1.1:1234"); @@ -311,7 +332,7 @@ public void shouldSupportIPv4Streaming() { @Test public void shouldSupportIPv6Batch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234"); @@ -326,7 +347,7 @@ public void shouldSupportIPv6Batch() { @Test public void shouldSupportIPv6Streaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234"); @@ -342,7 +363,7 @@ public void shouldSupportIPv6Streaming() { @Test public void shouldRemoveHttpProtocolFromHostBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); for (String flinkMaster : @@ -358,7 +379,7 @@ public void shouldRemoveHttpProtocolFromHostBatch() { @Test public void shouldRemoveHttpProtocolFromHostStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); for (String flinkMaster : @@ -382,7 +403,7 @@ private String extractFlinkConfig() throws IOException { @Test public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() { // Checkpointing disabled, shut down sources immediately - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(0L)); } @@ -390,7 +411,7 @@ public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() { @Test public void shouldAutoSetIdleSourcesFlagWithCheckpointing() { // Checkpointing is enabled, never shut down sources - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setCheckpointingInterval(1000L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(Long.MAX_VALUE)); @@ -399,7 +420,7 @@ public void shouldAutoSetIdleSourcesFlagWithCheckpointing() { @Test public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() { // Checkpointing disabled, accept flag - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setShutdownSourcesAfterIdleMs(42L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(42L)); @@ -408,7 +429,7 @@ public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() { @Test public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() { // Checkpointing enable, still accept flag - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setCheckpointingInterval(1000L); options.setShutdownSourcesAfterIdleMs(42L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); @@ -418,7 +439,7 @@ public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() { @Test public void shouldSetSavepointRestoreForRemoteStreaming() { String path = "fakePath"; - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); options.setSavepointPath(path); @@ -432,7 +453,7 @@ public void shouldSetSavepointRestoreForRemoteStreaming() { @Test public void shouldFailOnUnknownStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("unknown"); options.setStateBackendStoragePath("/path"); @@ -445,7 +466,7 @@ public void shouldFailOnUnknownStateBackend() { @Test public void shouldFailOnNoStoragePathProvided() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("unknown"); @@ -457,7 +478,7 @@ public void shouldFailOnNoStoragePathProvided() { @Test public void shouldCreateFileSystemStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("fileSystem"); options.setStateBackendStoragePath(temporaryFolder.getRoot().toURI().toString()); @@ -470,7 +491,7 @@ public void shouldCreateFileSystemStateBackend() { @Test public void shouldCreateRocksDbStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("rocksDB"); options.setStateBackendStoragePath(temporaryFolder.getRoot().toURI().toString()); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java index d8c4c6f6c8ec..6733a5976e1c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.core.Every.everyItem; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; import java.io.ByteArrayOutputStream; import java.io.File; @@ -38,6 +39,8 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.runners.core.construction.PTransformMatchers; @@ -68,13 +71,13 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.powermock.reflect.Whitebox; /** Tests for {@link FlinkPipelineExecutionEnvironment}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) @SuppressWarnings({ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) @@ -82,9 +85,25 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable { @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Parameterized.Parameter + public boolean useDataStreamForBatch; + + @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") + public static Collection useDataStreamForBatchJobValues() { + return Arrays.asList(new Object[][] { + {false}, {true} + }); + } + + private FlinkPipelineOptions getDefaultPipelineOptions() { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); + return options; + } + @Test public void shouldRecognizeAndTranslateStreamingPipeline() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("[auto]"); @@ -136,6 +155,8 @@ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws IOEx @Test public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() throws IOException { + // StreamingExecutionEnv does not support "collection" mode. + assumeFalse(useDataStreamForBatch); FlinkPipelineOptions options = testPreparingResourcesToStage("[collection]"); assertThat(options.getFilesToStage().size(), is(2)); @@ -152,7 +173,7 @@ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws IOE @Test public void shouldUseDefaultTempLocationIfNoneSet() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); @@ -168,42 +189,31 @@ public void shouldUseDefaultTempLocationIfNoneSet() { @Test public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); - options.setRunner(TestFlinkRunner.class); - options.setFlinkMaster("clusterAddress"); - - FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); - - Pipeline pipeline = Pipeline.create(options); - flinkEnv.translate(pipeline); - - ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment(); - assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class)); - - List jarFiles = getJars(executionEnvironment); - - List urlConvertedStagedFiles = convertFilesToURLs(options.getFilesToStage()); - - assertThat(jarFiles, is(urlConvertedStagedFiles)); + shouldUsePreparedFilesOnRemoteStreamEnvironment(true); + shouldUsePreparedFilesOnRemoteStreamEnvironment(false); } - @Test - public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws Exception { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) throws Exception { + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); - options.setStreaming(true); + options.setStreaming(streamingMode); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); Pipeline pipeline = Pipeline.create(options); flinkEnv.translate(pipeline); - StreamExecutionEnvironment streamExecutionEnvironment = - flinkEnv.getStreamExecutionEnvironment(); - assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); - - List jarFiles = getJars(streamExecutionEnvironment); + List jarFiles; + if (streamingMode || options.getUseDataStreamForBatch()) { + StreamExecutionEnvironment streamExecutionEnvironment = flinkEnv.getStreamExecutionEnvironment(); + assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); + jarFiles = getJars(streamExecutionEnvironment); + } else { + ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment(); + assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class)); + jarFiles = getJars(executionEnvironment); + } List urlConvertedStagedFiles = convertFilesToURLs(options.getFilesToStage()); @@ -214,7 +224,7 @@ public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws Exception { public void shouldUseTransformOverrides() { boolean[] testParameters = {true, false}; for (boolean streaming : testParameters) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(streaming); options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -234,7 +244,7 @@ public void shouldUseTransformOverrides() { @Test public void shouldProvideParallelismToTransformOverrides() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -278,7 +288,7 @@ public boolean matches(Object actual) { @Test public void shouldUseStreamingTransformOverridesWithUnboundedSources() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); // no explicit streaming mode set options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -303,7 +313,7 @@ public void shouldUseStreamingTransformOverridesWithUnboundedSources() { @Test public void testTranslationModeOverrideWithUnboundedSources() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setStreaming(false); @@ -319,7 +329,7 @@ public void testTranslationModeOverrideWithUnboundedSources() { public void testTranslationModeNoOverrideWithoutUnboundedSources() { boolean[] testArgs = new boolean[] {true, false}; for (boolean streaming : testArgs) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setStreaming(streaming); @@ -408,7 +418,7 @@ private FlinkPipelineOptions testPreparingResourcesToStage( private FlinkPipelineOptions setPipelineOptions( String flinkMaster, String tempLocation, List filesToStage) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster(flinkMaster); options.setTempLocation(tempLocation); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java index c2d9163aacc9..da8c560690a6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java @@ -94,6 +94,7 @@ public void testDefaults() { assertThat(options.getMaxBundleSize(), is(1000L)); assertThat(options.getMaxBundleTimeMills(), is(1000L)); assertThat(options.getExecutionModeForBatch(), is(ExecutionMode.PIPELINED.name())); + assertThat(options.getUseDataStreamForBatch(), is(false)); assertThat(options.getSavepointPath(), is(nullValue())); assertThat(options.getAllowNonRestoredState(), is(false)); assertThat(options.getDisableMetrics(), is(false)); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java index 5d56e6ddbf67..84f1dc3c6457 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java @@ -156,7 +156,7 @@ public void testStatefulParDoAfterCombineChaining() { private JobGraph getStatefulParDoAfterCombineChainingJobGraph(boolean stablePartitioning) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamingPipelineTranslator translator = - new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create()); + new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create(), true); final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(FlinkRunner.class); final Pipeline pipeline = Pipeline.create(pipelineOptions); @@ -188,7 +188,7 @@ public void testStatefulParDoAfterGroupByKeyChaining() { private JobGraph getStatefulParDoAfterGroupByKeyChainingJobGraph(boolean stablePartitioning) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamingPipelineTranslator translator = - new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create()); + new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create(), true); final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(FlinkRunner.class); final Pipeline pipeline = Pipeline.create(pipelineOptions); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 451070c1c164..10c570ceddf5 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -29,8 +29,8 @@ import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; -import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -49,8 +49,8 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; @@ -76,11 +76,9 @@ public void readSourceTranslatorBoundedWithMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - UnboundedSourceWrapperNoValueWithRecordId source = - (UnboundedSourceWrapperNoValueWithRecordId) - ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); + FlinkBoundedSource source = (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); - assertEquals(maxParallelism, source.getUnderlyingSource().getSplitSources().size()); + assertEquals(maxParallelism, source.getNumSplits()); } @Test @@ -96,11 +94,9 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - UnboundedSourceWrapperNoValueWithRecordId source = - (UnboundedSourceWrapperNoValueWithRecordId) - ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); + FlinkBoundedSource source = (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); - assertEquals(parallelism, source.getUnderlyingSource().getSplitSources().size()); + assertEquals(parallelism, source.getNumSplits()); } @Test @@ -119,13 +115,11 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() { (OneInputTransformation) applyReadSourceTransform(transform, PCollection.IsBounded.UNBOUNDED, env); - UnboundedSourceWrapper source = - (UnboundedSourceWrapper) - ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) - .getOperator() - .getUserFunction(); + FlinkSource source = + (FlinkSource) + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource(); - assertEquals(maxParallelism, source.getSplitSources().size()); + assertEquals(maxParallelism, source.getNumSplits()); } @Test @@ -142,13 +136,11 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { (OneInputTransformation) applyReadSourceTransform(transform, PCollection.IsBounded.UNBOUNDED, env); - UnboundedSourceWrapper source = - (UnboundedSourceWrapper) - ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) - .getOperator() - .getUserFunction(); + FlinkSource source = + (FlinkSource) + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource(); - assertEquals(parallelism, source.getSplitSources().size()); + assertEquals(parallelism, source.getNumSplits()); } private Object applyReadSourceTransform( @@ -157,7 +149,7 @@ private Object applyReadSourceTransform( FlinkStreamingPipelineTranslator.StreamTransformTranslator> translator = getReadSourceTranslator(); FlinkStreamingTranslationContext ctx = - new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create()); + new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create(), true); Pipeline pipeline = Pipeline.create(); PCollection pc = diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java index 601dbc66b1a2..b502e1129ee2 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java @@ -72,6 +72,8 @@ public class FlinkSubmissionTest { /** Counter which keeps track of the number of jobs submitted. */ private static int expectedNumberOfJobs; + public static boolean useDataStreamForBatch; + @BeforeClass public static void beforeClass() throws Exception { Configuration config = new Configuration(); @@ -104,6 +106,12 @@ public void testSubmissionBatch() throws Exception { runSubmission(false, false); } + @Test + public void testSubmissionBatchUseDataStream() throws Exception { + FlinkSubmissionTest.useDataStreamForBatch = true; + runSubmission(false, false); + } + @Test public void testSubmissionStreaming() throws Exception { runSubmission(false, true); @@ -114,6 +122,12 @@ public void testDetachedSubmissionBatch() throws Exception { runSubmission(true, false); } + @Test + public void testDetachedSubmissionBatchUseDataStream() throws Exception { + FlinkSubmissionTest.useDataStreamForBatch = true; + runSubmission(true, false); + } + @Test public void testDetachedSubmissionStreaming() throws Exception { runSubmission(true, true); @@ -164,6 +178,7 @@ private void waitUntilJobIsCompleted() throws Exception { /** The Flink program which is executed by the CliFrontend. */ public static void main(String[] args) { FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); options.setRunner(FlinkRunner.class); options.setStreaming(streaming); options.setParallelism(1); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java index 8da44d4b3a83..21fea7366da9 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java @@ -56,13 +56,18 @@ public void postSubmit() throws Exception { } @Test - public void testProgram() throws Exception { - runProgram(resultPath); + public void testStreaming() { + runProgram(resultPath, true); } - private static void runProgram(String resultPath) { + @Test + public void testBatch() { + runProgram(resultPath, false); + } + + private static void runProgram(String resultPath, boolean streaming) { - Pipeline p = FlinkTestPipeline.createForStreaming(); + Pipeline p = streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch(); p.apply(GenerateSequence.from(0).to(10)) .apply( From 741e2cb956ee96a499a632119b186f013daa3f9f Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 7 Mar 2023 10:03:02 +0800 Subject: [PATCH 3/7] Update CHANGES.md to record the notable change. --- CHANGES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 49b4fdfe89b0..74bba37d5168 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -339,6 +339,9 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a * The Go SDK now requires Go 1.19 to build. ([#25545](https://github.com/apache/beam/pull/25545)) * The Go SDK now has an initial native Go implementation of a portable Beam Runner called Prism. ([#24789](https://github.com/apache/beam/pull/24789)) * For more details and current state see https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism. +* Add `UseDataStreamForBatch` pipeline option to the Flink runner. When it is set to true, Flink runner will run batch + jobs executed with DataStream API. By default the option is set to false, so the batch jobs are still executed + with DataSet API. ## Breaking Changes From c73aa05c0abd251e1b25a44af3a219d964fa42bb Mon Sep 17 00:00:00 2001 From: jto Date: Fri, 22 Sep 2023 15:03:19 +0200 Subject: [PATCH 4/7] Fix compilation errors --- .../beam/runners/flink/FlinkStreamingTransformTranslators.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 15bc0e0f71a1..4e94d6957a63 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -223,7 +223,7 @@ public void translateNode( ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - FlinkUnboundedSource unboundedSource = FlinkSource.unbounded( + FlinkUnboundedSource unboundedSource = FlinkSource.unbounded(transform.getName(), rawSource, new SerializablePipelineOptions(context.getPipelineOptions()), parallelism); nonDedupSource = context @@ -376,6 +376,7 @@ public void translateNode( : context.getExecutionEnvironment().getParallelism(); FlinkBoundedSource flinkBoundedSource = FlinkSource.bounded( + transform.getName(), rawSource, new SerializablePipelineOptions(context.getPipelineOptions()), parallelism); From a05ea70c9abd59497bce4d74243cea48f92a2c29 Mon Sep 17 00:00:00 2001 From: jto Date: Wed, 27 Sep 2023 10:38:57 +0200 Subject: [PATCH 5/7] spotless --- .../FlinkPipelineExecutionEnvironment.java | 6 ++-- .../runners/flink/FlinkPipelineOptions.java | 6 +++- .../FlinkStreamingPipelineTranslator.java | 4 +-- .../FlinkStreamingTransformTranslators.java | 34 ++++++++++++------- .../FlinkStreamingTranslationContext.java | 4 +-- .../VersionDependentFlinkPipelineOptions.java | 8 ++--- .../wrappers/streaming/DoFnOperator.java | 8 +++-- .../flink/FlinkExecutionEnvironmentsTest.java | 7 ++-- ...FlinkPipelineExecutionEnvironmentTest.java | 13 ++++--- ...linkStreamingTransformTranslatorsTest.java | 12 ++++--- .../flink/ReadSourceStreamingTest.java | 3 +- 11 files changed, 58 insertions(+), 47 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 6697fcec2439..12ed3603264a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -105,9 +105,11 @@ public void translate(Pipeline pipeline) { if (options.isStreaming() || options.getUseDataStreamForBatch()) { this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) { - LOG.warn("UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); + LOG.warn( + "UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); } - translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming()); + translator = + new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming()); if (!options.isStreaming()) { flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 0d2142ce4397..f0514c69891b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -32,7 +32,11 @@ * requiring flink on the classpath (e.g. to use with the direct runner). */ public interface FlinkPipelineOptions - extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions, VersionDependentFlinkPipelineOptions { + extends PipelineOptions, + ApplicationNameOptions, + StreamingOptions, + FileStagingOptions, + VersionDependentFlinkPipelineOptions { String AUTO = "[auto]"; String PIPELINED = "PIPELINED"; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 62a47572c870..ffc7da97cd02 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -82,9 +82,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { private int depth = 0; public FlinkStreamingPipelineTranslator( - StreamExecutionEnvironment env, - PipelineOptions options, - boolean isStreaming) { + StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { this.streamingContext = new FlinkStreamingTranslationContext(env, options, isStreaming); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 4e94d6957a63..f3901fde03ba 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -223,12 +223,17 @@ public void translateNode( ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - FlinkUnboundedSource unboundedSource = FlinkSource.unbounded(transform.getName(), - rawSource, new SerializablePipelineOptions(context.getPipelineOptions()), parallelism); + FlinkUnboundedSource unboundedSource = + FlinkSource.unbounded( + transform.getName(), + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); nonDedupSource = context .getExecutionEnvironment() - .fromSource(unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) + .fromSource( + unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) .uid(fullName); if (rawSource.requiresDeduping()) { @@ -339,7 +344,8 @@ private static class ReadSourceTranslator @Override void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) == PCollection.IsBounded.BOUNDED) { + if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) + == PCollection.IsBounded.BOUNDED) { boundedTranslator.translateNode(transform, context); } else { unboundedTranslator.translateNode(transform, context); @@ -375,18 +381,20 @@ public void translateNode( ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - FlinkBoundedSource flinkBoundedSource = FlinkSource.bounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); + FlinkBoundedSource flinkBoundedSource = + FlinkSource.bounded( + transform.getName(), + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); DataStream> source; try { source = context .getExecutionEnvironment() - .fromSource(flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) + .fromSource( + flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) .uid(fullName); } catch (Exception e) { throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); @@ -554,9 +562,9 @@ static void translateParDo( KeySelector, ?> keySelector = null; boolean stateful = false; DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (!signature.stateDeclarations().isEmpty() || - !signature.timerDeclarations().isEmpty() || - !signature.timerFamilyDeclarations().isEmpty()) { + if (!signature.stateDeclarations().isEmpty() + || !signature.timerDeclarations().isEmpty() + || !signature.timerFamilyDeclarations().isEmpty()) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index a50c5d61d420..0a89bd18172b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -64,9 +64,7 @@ class FlinkStreamingTranslationContext { private AppliedPTransform currentTransform; public FlinkStreamingTranslationContext( - StreamExecutionEnvironment env, - PipelineOptions options, - boolean isStreaming) { + StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { this.env = checkNotNull(env); this.options = checkNotNull(options); this.isStreaming = isStreaming; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java index 05b5ef41645c..48ee15501156 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java @@ -14,20 +14,18 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ - package org.apache.beam.runners.flink; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; - public interface VersionDependentFlinkPipelineOptions extends PipelineOptions { - @Description("When set to true, the batch job execution will use DataStream API. " - + "Otherwise, the batch job execution will use the legacy DataSet API.") + @Description( + "When set to true, the batch job execution will use DataStream API. " + + "Otherwise, the batch job execution will use the legacy DataSet API.") @Default.Boolean(false) Boolean getUseDataStreamForBatch(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 28bbd4481031..63f5ede00242 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -1639,12 +1639,14 @@ public Instant currentInputWatermarkTime() { // or long MAX_VALUE. So we should just use the Flink time service watermark in batch mode. // // In Flink the watermark ranges from - // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the beam + // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the + // beam // watermark range is [BoundedWindow.TIMESTAMP_MIN_VALUE (-9223372036854775), // BoundedWindow.TIMESTAMP_MAX_VALUE (9223372036854775)]. To ensure the timestamp visible to // the users follow the Beam convention, we just use the Beam range instead. - return timerService.currentWatermark() == Long.MAX_VALUE ? - new Instant(Long.MAX_VALUE) : BoundedWindow.TIMESTAMP_MIN_VALUE; + return timerService.currentWatermark() == Long.MAX_VALUE + ? new Instant(Long.MAX_VALUE) + : BoundedWindow.TIMESTAMP_MIN_VALUE; } else { return new Instant(getEffectiveInputWatermark()); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 6654d9f59cf8..ec44d279586d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -62,14 +62,11 @@ public class FlinkExecutionEnvironmentsTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); - @Parameterized.Parameter - public boolean useDataStreamForBatch; + @Parameterized.Parameter public boolean useDataStreamForBatch; @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") public static Collection useDataStreamForBatchJobValues() { - return Arrays.asList(new Object[][] { - {false}, {true} - }); + return Arrays.asList(new Object[][] {{false}, {true}}); } private FlinkPipelineOptions getDefaultPipelineOptions() { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java index 6733a5976e1c..676e35d4bc0f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java @@ -85,14 +85,11 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable { @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); - @Parameterized.Parameter - public boolean useDataStreamForBatch; + @Parameterized.Parameter public boolean useDataStreamForBatch; @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") public static Collection useDataStreamForBatchJobValues() { - return Arrays.asList(new Object[][] { - {false}, {true} - }); + return Arrays.asList(new Object[][] {{false}, {true}}); } private FlinkPipelineOptions getDefaultPipelineOptions() { @@ -193,7 +190,8 @@ public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception { shouldUsePreparedFilesOnRemoteStreamEnvironment(false); } - public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) throws Exception { + public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) + throws Exception { FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); @@ -206,7 +204,8 @@ public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMod List jarFiles; if (streamingMode || options.getUseDataStreamForBatch()) { - StreamExecutionEnvironment streamExecutionEnvironment = flinkEnv.getStreamExecutionEnvironment(); + StreamExecutionEnvironment streamExecutionEnvironment = + flinkEnv.getStreamExecutionEnvironment(); assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); jarFiles = getJars(streamExecutionEnvironment); } else { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 10c570ceddf5..d5d34b59214b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -76,7 +76,8 @@ public void readSourceTranslatorBoundedWithMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - FlinkBoundedSource source = (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); + FlinkBoundedSource source = + (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); assertEquals(maxParallelism, source.getNumSplits()); } @@ -94,7 +95,8 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - FlinkBoundedSource source = (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); + FlinkBoundedSource source = + (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); assertEquals(parallelism, source.getNumSplits()); } @@ -117,7 +119,8 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() { FlinkSource source = (FlinkSource) - ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource(); + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getSource(); assertEquals(maxParallelism, source.getNumSplits()); } @@ -138,7 +141,8 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { FlinkSource source = (FlinkSource) - ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource(); + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getSource(); assertEquals(parallelism, source.getNumSplits()); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java index 21fea7366da9..b8dc52f6cd4b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java @@ -67,7 +67,8 @@ public void testBatch() { private static void runProgram(String resultPath, boolean streaming) { - Pipeline p = streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch(); + Pipeline p = + streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch(); p.apply(GenerateSequence.from(0).to(10)) .apply( From b87005db8bdfff33295ab01d2fca0aaaaec82346 Mon Sep 17 00:00:00 2001 From: jto Date: Fri, 27 Oct 2023 09:47:59 +0200 Subject: [PATCH 6/7] Update change list --- CHANGES.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 74bba37d5168..2aa76509692e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,6 +71,9 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)). ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Add `UseDataStreamForBatch` pipeline option to the Flink runner. When it is set to true, Flink runner will run batch + jobs using the DataStream API. By default the option is set to false, so the batch jobs are still executed + using the DataSet API. ## Breaking Changes @@ -339,9 +342,6 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a * The Go SDK now requires Go 1.19 to build. ([#25545](https://github.com/apache/beam/pull/25545)) * The Go SDK now has an initial native Go implementation of a portable Beam Runner called Prism. ([#24789](https://github.com/apache/beam/pull/24789)) * For more details and current state see https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism. -* Add `UseDataStreamForBatch` pipeline option to the Flink runner. When it is set to true, Flink runner will run batch - jobs executed with DataStream API. By default the option is set to false, so the batch jobs are still executed - with DataSet API. ## Breaking Changes From e149b3623d82eae490d336ad064ba8d7c843f25d Mon Sep 17 00:00:00 2001 From: jto Date: Fri, 27 Oct 2023 15:50:42 +0200 Subject: [PATCH 7/7] Do not exclude org.apache.beam.sdk.transforms.GroupByKeyTest.testAfterProcessingTimeContinuationTriggerUsingState --- runners/flink/flink_runner.gradle | 3 --- 1 file changed, 3 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index ed6c11fbdab3..c510b346d5d0 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -319,9 +319,6 @@ def createValidatesRunnerTask(Map m) { if (!config.streaming) { // FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing. excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew' - } else { - // https://github.com/apache/beam/issues/25485 - excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState' } } }