Skip to content

Commit

Permalink
[Flink Runner] Add UseDataStreamForBatch option to Flink runner to en…
Browse files Browse the repository at this point in the history
…able batch execution on DataStream API (apache#28614)

Co-authored-by: Jiangjie Qin <[email protected]>
Co-authored-by: tvalentyn <[email protected]>
  • Loading branch information
3 people authored and Kanishk Karanawat committed Feb 7, 2024
1 parent b4dc328 commit d46fb00
Show file tree
Hide file tree
Showing 21 changed files with 327 additions and 148 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

<<<<<<< HEAD
# [2.46.0] - Unreleased

## Highlights
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;

/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */
public abstract class AbstractStreamOperatorCompat<OutputT>
Expand All @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() {
return getTimeServiceManager()
.map(
manager -> {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
InternalTimeServiceManager<?> tsm = getTimeServiceManagerCompat();
if (tsm instanceof InternalTimeServiceManagerImpl) {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
} else if (tsm instanceof BatchExecutionInternalTimeServiceManager) {
return 0;
} else {
throw new IllegalStateException(
String.format(
"Unknown implementation of InternalTimerServiceManager. %s", tsm));
}
})
.orElse(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;

/** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */
public abstract class AbstractStreamOperatorCompat<OutputT>
Expand All @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() {
return getTimeServiceManager()
.map(
manager -> {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
InternalTimeServiceManager<?> tsm = getTimeServiceManagerCompat();
if (tsm instanceof InternalTimeServiceManagerImpl) {
final InternalTimeServiceManagerImpl<?> cast =
(InternalTimeServiceManagerImpl<?>) getTimeServiceManagerCompat();
return cast.numProcessingTimeTimers();
} else if (tsm instanceof BatchExecutionInternalTimeServiceManager) {
return 0;
} else {
throw new IllegalStateException(
String.format(
"Unknown implementation of InternalTimerServiceManager. %s", tsm));
}
})
.orElse(0);
}
Expand Down
8 changes: 8 additions & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ class ValidatesRunnerConfig {
String name
boolean streaming
boolean checkpointing
boolean useDataStreamForBatch
ArrayList<String> sickbayTests
}

Expand All @@ -240,6 +241,7 @@ def createValidatesRunnerTask(Map m) {
description = "Validates the ${runnerType} runner"
def pipelineOptionsArray = ["--runner=TestFlinkRunner",
"--streaming=${config.streaming}",
"--useDataStreamForBatch=${config.useDataStreamForBatch}",
"--parallelism=2",
]
if (config.checkpointing) {
Expand Down Expand Up @@ -298,12 +300,17 @@ def createValidatesRunnerTask(Map m) {
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate'
// https://github.com/apache/beam/issues/20844
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating'
if (!config.streaming) {
// FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing.
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew'
}
}
}
}
}

createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false, sickbayTests: sickbayTests)
createValidatesRunnerTask(name: "validatesRunnerBatchWithDataStream", streaming: false, useDataStreamForBatch: true, sickbayTests: sickbayTests)
createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true, sickbayTests: sickbayTests)
// We specifically have a variant which runs with checkpointing enabled for the
// tests that require it since running a checkpoint variant is significantly
Expand All @@ -316,6 +323,7 @@ tasks.register('validatesRunner') {
group = 'Verification'
description "Validates Flink runner"
dependsOn validatesRunnerBatch
dependsOn validatesRunnerBatchWithDataStream
dependsOn validatesRunnerStreaming
dependsOn validatesRunnerStreamingCheckpointing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -96,13 +97,17 @@ public void translate(Pipeline pipeline) {
prepareFilesToStageForRemoteClusterExecution(options);

FlinkPipelineTranslator translator;
if (options.isStreaming()) {
if (options.isStreaming() || options.getUseDataStreamForBatch()) {
this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options);
if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
LOG.warn(
"UnboundedSources present which rely on checkpointing, but checkpointing is disabled.");
}
translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options);
translator =
new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming());
if (!options.isStreaming()) {
flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
}
} else {
this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options);
translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
* requiring flink on the classpath (e.g. to use with the direct runner).
*/
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions {
extends PipelineOptions,
ApplicationNameOptions,
StreamingOptions,
FileStagingOptions,
VersionDependentFlinkPipelineOptions {

String AUTO = "[auto]";
String PIPELINED = "PIPELINED";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {

private int depth = 0;

public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
this.streamingContext = new FlinkStreamingTranslationContext(env, options);
public FlinkStreamingPipelineTranslator(
StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) {
this.streamingContext = new FlinkStreamingTranslationContext(env, options, isStreaming);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
Expand All @@ -54,6 +52,9 @@
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
Expand Down Expand Up @@ -93,9 +94,10 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
Expand Down Expand Up @@ -220,16 +222,19 @@ public void translateNode(
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();
UnboundedSourceWrapper<T, ?> sourceWrapper =
new UnboundedSourceWrapper<>(
fullName, context.getPipelineOptions(), rawSource, parallelism);

FlinkUnboundedSource<T> unboundedSource =
FlinkSource.unbounded(
transform.getName(),
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);
nonDedupSource =
context
.getExecutionEnvironment()
.addSource(sourceWrapper)
.name(fullName)
.uid(fullName)
.returns(withIdTypeInfo);
.fromSource(
unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo)
.uid(fullName);

if (rawSource.requiresDeduping()) {
source =
Expand Down Expand Up @@ -303,15 +308,24 @@ void translateNode(Impulse transform, FlinkStreamingTranslationContext context)
WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE),
context.getPipelineOptions());

long shutdownAfterIdleSourcesMs =
context
.getPipelineOptions()
.as(FlinkPipelineOptions.class)
.getShutdownSourcesAfterIdleMs();
FlinkBoundedSource<byte[]> impulseSource;
WatermarkStrategy<WindowedValue<byte[]>> watermarkStrategy;
if (context.isStreaming()) {
long shutdownAfterIdleSourcesMs =
context
.getPipelineOptions()
.as(FlinkPipelineOptions.class)
.getShutdownSourcesAfterIdleMs();
impulseSource = FlinkSource.unboundedImpulse(shutdownAfterIdleSourcesMs);
watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps();
} else {
impulseSource = FlinkSource.boundedImpulse();
watermarkStrategy = WatermarkStrategy.noWatermarks();
}
SingleOutputStreamOperator<WindowedValue<byte[]>> source =
context
.getExecutionEnvironment()
.addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse")
.fromSource(impulseSource, watermarkStrategy, "Impulse")
.returns(typeInfo);

context.setOutputDataStream(context.getOutput(transform), source);
Expand All @@ -330,7 +344,8 @@ private static class ReadSourceTranslator<T>
@Override
void translateNode(
PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) {
if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED)) {
if (ReadTranslation.sourceIsBounded(context.getCurrentTransform())
== PCollection.IsBounded.BOUNDED) {
boundedTranslator.translateNode(transform, context);
} else {
unboundedTranslator.translateNode(transform, context);
Expand Down Expand Up @@ -361,24 +376,26 @@ public void translateNode(
}

String fullName = getCurrentTransformName(context);
UnboundedSource<T, ?> adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource);
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();

FlinkBoundedSource<T> flinkBoundedSource =
FlinkSource.bounded(
transform.getName(),
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);

DataStream<WindowedValue<T>> source;
try {
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();
UnboundedSourceWrapperNoValueWithRecordId<T, ?> sourceWrapper =
new UnboundedSourceWrapperNoValueWithRecordId<>(
new UnboundedSourceWrapper<>(
fullName, context.getPipelineOptions(), adaptedRawSource, parallelism));
source =
context
.getExecutionEnvironment()
.addSource(sourceWrapper)
.name(fullName)
.uid(fullName)
.returns(outputTypeInfo);
.fromSource(
flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo)
.uid(fullName);
} catch (Exception e) {
throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
}
Expand Down Expand Up @@ -545,7 +562,9 @@ static <InputT, OutputT> void translateParDo(
KeySelector<WindowedValue<InputT>, ?> keySelector = null;
boolean stateful = false;
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) {
if (!signature.stateDeclarations().isEmpty()
|| !signature.timerDeclarations().isEmpty()
|| !signature.timerFamilyDeclarations().isEmpty()) {
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class FlinkStreamingTranslationContext {

private final StreamExecutionEnvironment env;
private final PipelineOptions options;
private final boolean isStreaming;

/**
* Keeps a mapping between the output value of the PTransform and the Flink Operator that produced
Expand All @@ -62,9 +63,11 @@ class FlinkStreamingTranslationContext {

private AppliedPTransform<?, ?, ?> currentTransform;

public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
public FlinkStreamingTranslationContext(
StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) {
this.env = checkNotNull(env);
this.options = checkNotNull(options);
this.isStreaming = isStreaming;
}

public StreamExecutionEnvironment getExecutionEnvironment() {
Expand All @@ -75,6 +78,10 @@ public PipelineOptions getPipelineOptions() {
return options;
}

public boolean isStreaming() {
return isStreaming;
}

@SuppressWarnings("unchecked")
public <T> DataStream<T> getInputDataStream(PValue value) {
return (DataStream<T>) dataStreams.get(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
class FlinkTransformOverrides {
static List<PTransformOverride> getDefaultOverrides(FlinkPipelineOptions options) {
ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder();
if (options.isStreaming()) {
if (options.isStreaming() || options.getUseDataStreamForBatch()) {
builder
.add(
PTransformOverride.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

public interface VersionDependentFlinkPipelineOptions extends PipelineOptions {

@Description(
"When set to true, the batch job execution will use DataStream API. "
+ "Otherwise, the batch job execution will use the legacy DataSet API.")
@Default.Boolean(false)
Boolean getUseDataStreamForBatch();

void setUseDataStreamForBatch(Boolean useDataStreamForBatch);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
Expand Down
Loading

0 comments on commit d46fb00

Please sign in to comment.