diff --git a/CHANGES.md b/CHANGES.md index 8514990fb03a..813e17343c2d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -50,6 +50,7 @@ * ([#X](https://github.com/apache/beam/issues/X)). --> +<<<<<<< HEAD # [2.46.0] - Unreleased ## Highlights diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java index bb794e04398d..5072e6b2459f 100644 --- a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; /** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ public abstract class AbstractStreamOperatorCompat @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() { return getTimeServiceManager() .map( manager -> { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); + InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); + if (tsm instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); + return cast.numProcessingTimeTimers(); + } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", tsm)); + } }) .orElse(0); } diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java index 3b64612d6d19..d8740964fda9 100644 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; /** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ public abstract class AbstractStreamOperatorCompat @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() { return getTimeServiceManager() .map( manager -> { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); + InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); + if (tsm instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); + return cast.numProcessingTimeTimers(); + } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", tsm)); + } }) .orElse(0); } diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index cdfe921c1b2e..0ac380badda1 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -222,6 +222,7 @@ class ValidatesRunnerConfig { String name boolean streaming boolean checkpointing + boolean useDataStreamForBatch ArrayList sickbayTests } @@ -240,6 +241,7 @@ def createValidatesRunnerTask(Map m) { description = "Validates the ${runnerType} runner" def pipelineOptionsArray = ["--runner=TestFlinkRunner", "--streaming=${config.streaming}", + "--useDataStreamForBatch=${config.useDataStreamForBatch}", "--parallelism=2", ] if (config.checkpointing) { @@ -298,12 +300,17 @@ def createValidatesRunnerTask(Map m) { excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate' // https://github.com/apache/beam/issues/20844 excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating' + if (!config.streaming) { + // FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing. + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew' + } } } } } createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false, sickbayTests: sickbayTests) +createValidatesRunnerTask(name: "validatesRunnerBatchWithDataStream", streaming: false, useDataStreamForBatch: true, sickbayTests: sickbayTests) createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true, sickbayTests: sickbayTests) // We specifically have a variant which runs with checkpointing enabled for the // tests that require it since running a checkpoint variant is significantly @@ -316,6 +323,7 @@ tasks.register('validatesRunner') { group = 'Verification' description "Validates Flink runner" dependsOn validatesRunnerBatch + dependsOn validatesRunnerBatchWithDataStream dependsOn validatesRunnerStreaming dependsOn validatesRunnerStreamingCheckpointing } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 07bb3e9d6619..281abc0b3805 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -96,13 +97,17 @@ public void translate(Pipeline pipeline) { prepareFilesToStageForRemoteClusterExecution(options); FlinkPipelineTranslator translator; - if (options.isStreaming()) { + if (options.isStreaming() || options.getUseDataStreamForBatch()) { this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) { LOG.warn( "UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); } - translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options); + translator = + new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming()); + if (!options.isStreaming()) { + flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); + } } else { this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 29fe0543adf6..bb9f98232de9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -32,7 +32,11 @@ * requiring flink on the classpath (e.g. to use with the direct runner). */ public interface FlinkPipelineOptions - extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions { + extends PipelineOptions, + ApplicationNameOptions, + StreamingOptions, + FileStagingOptions, + VersionDependentFlinkPipelineOptions { String AUTO = "[auto]"; String PIPELINED = "PIPELINED"; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 8a74735cd83e..f56f10f1151f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -81,8 +81,9 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { private int depth = 0; - public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) { - this.streamingContext = new FlinkStreamingTranslationContext(env, options); + public FlinkStreamingPipelineTranslator( + StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { + this.streamingContext = new FlinkStreamingTranslationContext(env, options, isStreaming); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index ec97bb733776..be54e09c3973 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -38,9 +38,7 @@ import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; -import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; @@ -54,6 +52,9 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -93,9 +94,10 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -220,16 +222,19 @@ public void translateNode( context.getExecutionEnvironment().getMaxParallelism() > 0 ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - UnboundedSourceWrapper sourceWrapper = - new UnboundedSourceWrapper<>( - fullName, context.getPipelineOptions(), rawSource, parallelism); + + FlinkUnboundedSource unboundedSource = + FlinkSource.unbounded( + transform.getName(), + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); nonDedupSource = context .getExecutionEnvironment() - .addSource(sourceWrapper) - .name(fullName) - .uid(fullName) - .returns(withIdTypeInfo); + .fromSource( + unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) + .uid(fullName); if (rawSource.requiresDeduping()) { source = @@ -303,15 +308,24 @@ void translateNode(Impulse transform, FlinkStreamingTranslationContext context) WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE), context.getPipelineOptions()); - long shutdownAfterIdleSourcesMs = - context - .getPipelineOptions() - .as(FlinkPipelineOptions.class) - .getShutdownSourcesAfterIdleMs(); + FlinkBoundedSource impulseSource; + WatermarkStrategy> watermarkStrategy; + if (context.isStreaming()) { + long shutdownAfterIdleSourcesMs = + context + .getPipelineOptions() + .as(FlinkPipelineOptions.class) + .getShutdownSourcesAfterIdleMs(); + impulseSource = FlinkSource.unboundedImpulse(shutdownAfterIdleSourcesMs); + watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps(); + } else { + impulseSource = FlinkSource.boundedImpulse(); + watermarkStrategy = WatermarkStrategy.noWatermarks(); + } SingleOutputStreamOperator> source = context .getExecutionEnvironment() - .addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse") + .fromSource(impulseSource, watermarkStrategy, "Impulse") .returns(typeInfo); context.setOutputDataStream(context.getOutput(transform), source); @@ -330,7 +344,8 @@ private static class ReadSourceTranslator @Override void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED)) { + if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) + == PCollection.IsBounded.BOUNDED) { boundedTranslator.translateNode(transform, context); } else { unboundedTranslator.translateNode(transform, context); @@ -361,24 +376,26 @@ public void translateNode( } String fullName = getCurrentTransformName(context); - UnboundedSource adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource); + int parallelism = + context.getExecutionEnvironment().getMaxParallelism() > 0 + ? context.getExecutionEnvironment().getMaxParallelism() + : context.getExecutionEnvironment().getParallelism(); + + FlinkBoundedSource flinkBoundedSource = + FlinkSource.bounded( + transform.getName(), + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + DataStream> source; try { - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - UnboundedSourceWrapperNoValueWithRecordId sourceWrapper = - new UnboundedSourceWrapperNoValueWithRecordId<>( - new UnboundedSourceWrapper<>( - fullName, context.getPipelineOptions(), adaptedRawSource, parallelism)); source = context .getExecutionEnvironment() - .addSource(sourceWrapper) - .name(fullName) - .uid(fullName) - .returns(outputTypeInfo); + .fromSource( + flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) + .uid(fullName); } catch (Exception e) { throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); } @@ -545,7 +562,9 @@ static void translateParDo( KeySelector, ?> keySelector = null; boolean stateful = false; DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { + if (!signature.stateDeclarations().isEmpty() + || !signature.timerDeclarations().isEmpty() + || !signature.timerFamilyDeclarations().isEmpty()) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index cf235da1a025..f015d0c22dad 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -51,6 +51,7 @@ class FlinkStreamingTranslationContext { private final StreamExecutionEnvironment env; private final PipelineOptions options; + private final boolean isStreaming; /** * Keeps a mapping between the output value of the PTransform and the Flink Operator that produced @@ -62,9 +63,11 @@ class FlinkStreamingTranslationContext { private AppliedPTransform currentTransform; - public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { + public FlinkStreamingTranslationContext( + StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { this.env = checkNotNull(env); this.options = checkNotNull(options); + this.isStreaming = isStreaming; } public StreamExecutionEnvironment getExecutionEnvironment() { @@ -75,6 +78,10 @@ public PipelineOptions getPipelineOptions() { return options; } + public boolean isStreaming() { + return isStreaming; + } + @SuppressWarnings("unchecked") public DataStream getInputDataStream(PValue value) { return (DataStream) dataStreams.get(value); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java index 3766e1ede1aa..b5145b030e62 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java @@ -36,7 +36,7 @@ class FlinkTransformOverrides { static List getDefaultOverrides(FlinkPipelineOptions options) { ImmutableList.Builder builder = ImmutableList.builder(); - if (options.isStreaming()) { + if (options.isStreaming() || options.getUseDataStreamForBatch()) { builder .add( PTransformOverride.of( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java new file mode 100644 index 000000000000..48ee15501156 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +public interface VersionDependentFlinkPipelineOptions extends PipelineOptions { + + @Description( + "When set to true, the batch job execution will use DataStream API. " + + "Otherwise, the batch job execution will use the legacy DataSet API.") + @Default.Boolean(false) + Boolean getUseDataStreamForBatch(); + + void setUseDataStreamForBatch(Boolean useDataStreamForBatch); +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java index a4a363689329..a9a6db47c814 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerBase.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 13c4e4e0a99f..7a4a5c0b926e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -110,10 +110,12 @@ import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -672,6 +674,7 @@ protected final void setBundleFinishedCallback(Runnable callback) { @Override public final void processElement(StreamRecord> streamRecord) { checkInvokeStartBundle(); + LOG.trace("Processing element {} in {}", streamRecord.getValue().getValue(), doFn.getClass()); long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L; doFnRunner.processElement(streamRecord.getValue()); checkInvokeFinishBundleByCount(); @@ -754,6 +757,7 @@ public final void processElement2(StreamRecord streamRecord) thro @Override public final void processWatermark(Watermark mark) throws Exception { + LOG.trace("Processing watermark {} in {}", mark.getTimestamp(), doFn.getClass()); processWatermark1(mark); } @@ -1442,8 +1446,10 @@ private void populateOutputTimestampQueue(InternalTimerService timerS BiConsumerWithException consumer = (timerData, stamp) -> keyedStateInternals.addWatermarkHoldUsage(timerData.getOutputTimestamp()); - timerService.forEachEventTimeTimer(consumer); - timerService.forEachProcessingTimeTimer(consumer); + if (timerService instanceof InternalTimerServiceImpl) { + timerService.forEachEventTimeTimer(consumer); + timerService.forEachProcessingTimeTimer(consumer); + } } private String constructTimerId(String timerFamilyId, String timerId) { @@ -1494,6 +1500,7 @@ public void setTimer(TimerData timer) { } private void registerTimer(TimerData timer, String contextTimerId) throws Exception { + LOG.debug("Registering timer {}", timer); pendingTimersById.put(contextTimerId, timer); long time = timer.getTimestamp().getMillis(); switch (timer.getDomain()) { @@ -1604,7 +1611,31 @@ public Instant currentProcessingTime() { @Override public Instant currentInputWatermarkTime() { - return new Instant(getEffectiveInputWatermark()); + if (timerService instanceof BatchExecutionInternalTimeService) { + // In batch mode, this method will only either return BoundedWindow.TIMESTAMP_MIN_VALUE, + // or BoundedWindow.TIMESTAMP_MAX_VALUE. + // + // For batch execution mode, the currentInputWatermark variable will never be updated + // until all the records are processed. However, every time when a record with a new + // key arrives, the Flink timer service watermark will be set to + // MAX_WATERMARK(LONG.MAX_VALUE) so that all the timers associated with the current + // key can fire. After that the Flink timer service watermark will be reset to + // LONG.MIN_VALUE, so the next key will start from a fresh env as if the previous + // records of a different key never existed. So the watermark is either Long.MIN_VALUE + // or long MAX_VALUE. So we should just use the Flink time service watermark in batch mode. + // + // In Flink the watermark ranges from + // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the + // beam + // watermark range is [BoundedWindow.TIMESTAMP_MIN_VALUE (-9223372036854775), + // BoundedWindow.TIMESTAMP_MAX_VALUE (9223372036854775)]. To ensure the timestamp visible to + // the users follow the Beam convention, we just use the Beam range instead. + return timerService.currentWatermark() == Long.MAX_VALUE + ? new Instant(Long.MAX_VALUE) + : BoundedWindow.TIMESTAMP_MIN_VALUE; + } else { + return new Instant(getEffectiveInputWatermark()); + } } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java index c4d82cb5c8ad..6f2f473feddc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -51,4 +51,9 @@ public Iterable timersIterable() { public Iterable> elementsIterable() { return Collections.singletonList(value); } + + @Override + public String toString() { + return String.format("{%s, [%s]}", key, value); + } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 49d317d46ced..ec44d279586d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -29,6 +29,8 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -49,17 +51,33 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.powermock.reflect.Whitebox; /** Tests for {@link FlinkExecutionEnvironments}. */ +@RunWith(Parameterized.class) public class FlinkExecutionEnvironmentsTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); + @Parameterized.Parameter public boolean useDataStreamForBatch; + + @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") + public static Collection useDataStreamForBatchJobValues() { + return Arrays.asList(new Object[][] {{false}, {true}}); + } + + private FlinkPipelineOptions getDefaultPipelineOptions() { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); + return options; + } + @Test public void shouldSetParallelismBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setParallelism(42); @@ -71,7 +89,7 @@ public void shouldSetParallelismBatch() { @Test public void shouldSetParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setParallelism(42); @@ -84,7 +102,7 @@ public void shouldSetParallelismStreaming() { @Test public void shouldSetMaxParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setMaxParallelism(42); @@ -99,7 +117,7 @@ public void shouldSetMaxParallelismStreaming() { public void shouldInferParallelismFromEnvironmentBatch() throws IOException { String flinkConfDir = extractFlinkConfig(); - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -115,7 +133,7 @@ public void shouldInferParallelismFromEnvironmentBatch() throws IOException { public void shouldInferParallelismFromEnvironmentStreaming() throws IOException { String confDir = extractFlinkConfig(); - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -129,7 +147,7 @@ public void shouldInferParallelismFromEnvironmentStreaming() throws IOException @Test public void shouldFallbackToDefaultParallelismBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -141,7 +159,7 @@ public void shouldFallbackToDefaultParallelismBatch() { @Test public void shouldFallbackToDefaultParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -154,7 +172,7 @@ public void shouldFallbackToDefaultParallelismStreaming() { @Test public void useDefaultParallelismFromContextBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); @@ -166,7 +184,7 @@ public void useDefaultParallelismFromContextBatch() { @Test public void useDefaultParallelismFromContextStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); StreamExecutionEnvironment sev = @@ -179,7 +197,7 @@ public void useDefaultParallelismFromContextStreaming() { @Test public void shouldParsePortForRemoteEnvironmentBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:1234"); @@ -191,7 +209,7 @@ public void shouldParsePortForRemoteEnvironmentBatch() { @Test public void shouldParsePortForRemoteEnvironmentStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:1234"); @@ -204,7 +222,7 @@ public void shouldParsePortForRemoteEnvironmentStreaming() { @Test public void shouldAllowPortOmissionForRemoteEnvironmentBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host"); @@ -216,7 +234,7 @@ public void shouldAllowPortOmissionForRemoteEnvironmentBatch() { @Test public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host"); @@ -229,7 +247,7 @@ public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() { @Test public void shouldTreatAutoAndEmptyHostTheSameBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); ExecutionEnvironment sev = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); @@ -243,7 +261,7 @@ public void shouldTreatAutoAndEmptyHostTheSameBatch() { @Test public void shouldTreatAutoAndEmptyHostTheSameStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); StreamExecutionEnvironment sev = @@ -259,7 +277,7 @@ public void shouldTreatAutoAndEmptyHostTheSameStreaming() { @Test public void shouldDetectMalformedPortBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:p0rt"); @@ -271,7 +289,7 @@ public void shouldDetectMalformedPortBatch() { @Test public void shouldDetectMalformedPortStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:p0rt"); @@ -283,7 +301,7 @@ public void shouldDetectMalformedPortStreaming() { @Test public void shouldSupportIPv4Batch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("192.168.1.1:1234"); @@ -297,7 +315,7 @@ public void shouldSupportIPv4Batch() { @Test public void shouldSupportIPv4Streaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("192.168.1.1:1234"); @@ -311,7 +329,7 @@ public void shouldSupportIPv4Streaming() { @Test public void shouldSupportIPv6Batch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234"); @@ -326,7 +344,7 @@ public void shouldSupportIPv6Batch() { @Test public void shouldSupportIPv6Streaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234"); @@ -342,7 +360,7 @@ public void shouldSupportIPv6Streaming() { @Test public void shouldRemoveHttpProtocolFromHostBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); for (String flinkMaster : @@ -358,7 +376,7 @@ public void shouldRemoveHttpProtocolFromHostBatch() { @Test public void shouldRemoveHttpProtocolFromHostStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); for (String flinkMaster : @@ -382,7 +400,7 @@ private String extractFlinkConfig() throws IOException { @Test public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() { // Checkpointing disabled, shut down sources immediately - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(0L)); } @@ -390,7 +408,7 @@ public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() { @Test public void shouldAutoSetIdleSourcesFlagWithCheckpointing() { // Checkpointing is enabled, never shut down sources - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setCheckpointingInterval(1000L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(Long.MAX_VALUE)); @@ -399,7 +417,7 @@ public void shouldAutoSetIdleSourcesFlagWithCheckpointing() { @Test public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() { // Checkpointing disabled, accept flag - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setShutdownSourcesAfterIdleMs(42L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(42L)); @@ -408,7 +426,7 @@ public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() { @Test public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() { // Checkpointing enable, still accept flag - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setCheckpointingInterval(1000L); options.setShutdownSourcesAfterIdleMs(42L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); @@ -418,7 +436,7 @@ public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() { @Test public void shouldSetSavepointRestoreForRemoteStreaming() { String path = "fakePath"; - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); options.setSavepointPath(path); @@ -432,7 +450,7 @@ public void shouldSetSavepointRestoreForRemoteStreaming() { @Test public void shouldFailOnUnknownStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("unknown"); options.setStateBackendStoragePath("/path"); @@ -445,7 +463,7 @@ public void shouldFailOnUnknownStateBackend() { @Test public void shouldFailOnNoStoragePathProvided() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("unknown"); @@ -457,7 +475,7 @@ public void shouldFailOnNoStoragePathProvided() { @Test public void shouldCreateFileSystemStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("fileSystem"); options.setStateBackendStoragePath(temporaryFolder.getRoot().toURI().toString()); @@ -470,7 +488,7 @@ public void shouldCreateFileSystemStateBackend() { @Test public void shouldCreateRocksDbStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("rocksDB"); options.setStateBackendStoragePath(temporaryFolder.getRoot().toURI().toString()); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java index 8af13c4c7fe3..e656ab0e4439 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.core.Every.everyItem; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; import java.io.ByteArrayOutputStream; import java.io.File; @@ -38,6 +39,8 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.runners.core.construction.PTransformMatchers; @@ -68,13 +71,13 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.powermock.reflect.Whitebox; /** Tests for {@link FlinkPipelineExecutionEnvironment}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) @SuppressWarnings({ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) @@ -82,9 +85,22 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable { @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Parameterized.Parameter public boolean useDataStreamForBatch; + + @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") + public static Collection useDataStreamForBatchJobValues() { + return Arrays.asList(new Object[][] {{false}, {true}}); + } + + private FlinkPipelineOptions getDefaultPipelineOptions() { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); + return options; + } + @Test public void shouldRecognizeAndTranslateStreamingPipeline() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("[auto]"); @@ -136,6 +152,8 @@ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws IOEx @Test public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() throws IOException { + // StreamingExecutionEnv does not support "collection" mode. + assumeFalse(useDataStreamForBatch); FlinkPipelineOptions options = testPreparingResourcesToStage("[collection]"); assertThat(options.getFilesToStage().size(), is(2)); @@ -152,7 +170,7 @@ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws IOE @Test public void shouldUseDefaultTempLocationIfNoneSet() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); @@ -168,42 +186,33 @@ public void shouldUseDefaultTempLocationIfNoneSet() { @Test public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); - options.setRunner(TestFlinkRunner.class); - options.setFlinkMaster("clusterAddress"); - - FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); - - Pipeline pipeline = Pipeline.create(options); - flinkEnv.translate(pipeline); - - ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment(); - assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class)); - - List jarFiles = getJars(executionEnvironment); - - List urlConvertedStagedFiles = convertFilesToURLs(options.getFilesToStage()); - - assertThat(jarFiles, is(urlConvertedStagedFiles)); + shouldUsePreparedFilesOnRemoteStreamEnvironment(true); + shouldUsePreparedFilesOnRemoteStreamEnvironment(false); } - @Test - public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws Exception { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) + throws Exception { + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); - options.setStreaming(true); + options.setStreaming(streamingMode); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); Pipeline pipeline = Pipeline.create(options); flinkEnv.translate(pipeline); - StreamExecutionEnvironment streamExecutionEnvironment = - flinkEnv.getStreamExecutionEnvironment(); - assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); - - List jarFiles = getJars(streamExecutionEnvironment); + List jarFiles; + if (streamingMode || options.getUseDataStreamForBatch()) { + StreamExecutionEnvironment streamExecutionEnvironment = + flinkEnv.getStreamExecutionEnvironment(); + assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); + jarFiles = getJars(streamExecutionEnvironment); + } else { + ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment(); + assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class)); + jarFiles = getJars(executionEnvironment); + } List urlConvertedStagedFiles = convertFilesToURLs(options.getFilesToStage()); @@ -214,7 +223,7 @@ public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws Exception { public void shouldUseTransformOverrides() { boolean[] testParameters = {true, false}; for (boolean streaming : testParameters) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(streaming); options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -234,7 +243,7 @@ public void shouldUseTransformOverrides() { @Test public void shouldProvideParallelismToTransformOverrides() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -278,7 +287,7 @@ public boolean matches(Object actual) { @Test public void shouldUseStreamingTransformOverridesWithUnboundedSources() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); // no explicit streaming mode set options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -303,7 +312,7 @@ public void shouldUseStreamingTransformOverridesWithUnboundedSources() { @Test public void testTranslationModeOverrideWithUnboundedSources() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setStreaming(false); @@ -319,7 +328,7 @@ public void testTranslationModeOverrideWithUnboundedSources() { public void testTranslationModeNoOverrideWithoutUnboundedSources() { boolean[] testArgs = new boolean[] {true, false}; for (boolean streaming : testArgs) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setStreaming(streaming); @@ -408,7 +417,7 @@ private FlinkPipelineOptions testPreparingResourcesToStage( private FlinkPipelineOptions setPipelineOptions( String flinkMaster, String tempLocation, List filesToStage) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster(flinkMaster); options.setTempLocation(tempLocation); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java index c2d9163aacc9..da8c560690a6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java @@ -94,6 +94,7 @@ public void testDefaults() { assertThat(options.getMaxBundleSize(), is(1000L)); assertThat(options.getMaxBundleTimeMills(), is(1000L)); assertThat(options.getExecutionModeForBatch(), is(ExecutionMode.PIPELINED.name())); + assertThat(options.getUseDataStreamForBatch(), is(false)); assertThat(options.getSavepointPath(), is(nullValue())); assertThat(options.getAllowNonRestoredState(), is(false)); assertThat(options.getDisableMetrics(), is(false)); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java index 849f8be952cb..f37773db82f5 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java @@ -156,7 +156,7 @@ public void testStatefulParDoAfterCombineChaining() { private JobGraph getStatefulParDoAfterCombineChainingJobGraph(boolean stablePartitioning) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamingPipelineTranslator translator = - new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create()); + new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create(), true); final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(FlinkRunner.class); final Pipeline pipeline = Pipeline.create(pipelineOptions); @@ -188,7 +188,7 @@ public void testStatefulParDoAfterGroupByKeyChaining() { private JobGraph getStatefulParDoAfterGroupByKeyChainingJobGraph(boolean stablePartitioning) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamingPipelineTranslator translator = - new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create()); + new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create(), true); final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(FlinkRunner.class); final Pipeline pipeline = Pipeline.create(pipelineOptions); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 45cfb30f9118..228d889c88ed 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -29,8 +29,8 @@ import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; -import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -49,8 +49,8 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; @@ -76,11 +76,10 @@ public void readSourceTranslatorBoundedWithMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - UnboundedSourceWrapperNoValueWithRecordId source = - (UnboundedSourceWrapperNoValueWithRecordId) - ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); + FlinkBoundedSource source = + (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); - assertEquals(maxParallelism, source.getUnderlyingSource().getSplitSources().size()); + assertEquals(maxParallelism, source.getNumSplits()); } @Test @@ -96,11 +95,10 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - UnboundedSourceWrapperNoValueWithRecordId source = - (UnboundedSourceWrapperNoValueWithRecordId) - ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); + FlinkBoundedSource source = + (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); - assertEquals(parallelism, source.getUnderlyingSource().getSplitSources().size()); + assertEquals(parallelism, source.getNumSplits()); } @Test @@ -119,13 +117,12 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() { (OneInputTransformation) applyReadSourceTransform(transform, PCollection.IsBounded.UNBOUNDED, env); - UnboundedSourceWrapper source = - (UnboundedSourceWrapper) - ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) - .getOperator() - .getUserFunction(); + FlinkSource source = + (FlinkSource) + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getSource(); - assertEquals(maxParallelism, source.getSplitSources().size()); + assertEquals(maxParallelism, source.getNumSplits()); } @Test @@ -142,13 +139,12 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { (OneInputTransformation) applyReadSourceTransform(transform, PCollection.IsBounded.UNBOUNDED, env); - UnboundedSourceWrapper source = - (UnboundedSourceWrapper) - ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) - .getOperator() - .getUserFunction(); + FlinkSource source = + (FlinkSource) + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getSource(); - assertEquals(parallelism, source.getSplitSources().size()); + assertEquals(parallelism, source.getNumSplits()); } private Object applyReadSourceTransform( @@ -157,7 +153,7 @@ private Object applyReadSourceTransform( FlinkStreamingPipelineTranslator.StreamTransformTranslator> translator = getReadSourceTranslator(); FlinkStreamingTranslationContext ctx = - new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create()); + new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create(), true); Pipeline pipeline = Pipeline.create(); PCollection pc = diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java index a2c04d88f713..7317a343c3ee 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java @@ -72,6 +72,8 @@ public class FlinkSubmissionTest { /** Counter which keeps track of the number of jobs submitted. */ private static int expectedNumberOfJobs; + public static boolean useDataStreamForBatch; + @BeforeClass public static void beforeClass() throws Exception { Configuration config = new Configuration(); @@ -104,6 +106,12 @@ public void testSubmissionBatch() throws Exception { runSubmission(false, false); } + @Test + public void testSubmissionBatchUseDataStream() throws Exception { + FlinkSubmissionTest.useDataStreamForBatch = true; + runSubmission(false, false); + } + @Test public void testSubmissionStreaming() throws Exception { runSubmission(false, true); @@ -114,6 +122,12 @@ public void testDetachedSubmissionBatch() throws Exception { runSubmission(true, false); } + @Test + public void testDetachedSubmissionBatchUseDataStream() throws Exception { + FlinkSubmissionTest.useDataStreamForBatch = true; + runSubmission(true, false); + } + @Test public void testDetachedSubmissionStreaming() throws Exception { runSubmission(true, true); @@ -164,6 +178,7 @@ private void waitUntilJobIsCompleted() throws Exception { /** The Flink program which is executed by the CliFrontend. */ public static void main(String[] args) { FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); options.setRunner(FlinkRunner.class); options.setStreaming(streaming); options.setParallelism(1); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java index 2921065c1547..5a4122db8f0f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java @@ -56,13 +56,19 @@ public void postSubmit() throws Exception { } @Test - public void testProgram() throws Exception { - runProgram(resultPath); + public void testStreaming() { + runProgram(resultPath, true); } - private static void runProgram(String resultPath) { + @Test + public void testBatch() { + runProgram(resultPath, false); + } + + private static void runProgram(String resultPath, boolean streaming) { - Pipeline p = FlinkTestPipeline.createForStreaming(); + Pipeline p = + streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch(); p.apply(GenerateSequence.from(0).to(10)) .apply(