Skip to content

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Oct 26, 2023
1 parent c73aa05 commit a05ea70
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ public void translate(Pipeline pipeline) {
if (options.isStreaming() || options.getUseDataStreamForBatch()) {
this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options);
if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
LOG.warn("UnboundedSources present which rely on checkpointing, but checkpointing is disabled.");
LOG.warn(
"UnboundedSources present which rely on checkpointing, but checkpointing is disabled.");
}
translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming());
translator =
new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming());
if (!options.isStreaming()) {
flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
* requiring flink on the classpath (e.g. to use with the direct runner).
*/
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions, VersionDependentFlinkPipelineOptions {
extends PipelineOptions,
ApplicationNameOptions,
StreamingOptions,
FileStagingOptions,
VersionDependentFlinkPipelineOptions {

String AUTO = "[auto]";
String PIPELINED = "PIPELINED";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
private int depth = 0;

public FlinkStreamingPipelineTranslator(
StreamExecutionEnvironment env,
PipelineOptions options,
boolean isStreaming) {
StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) {
this.streamingContext = new FlinkStreamingTranslationContext(env, options, isStreaming);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,17 @@ public void translateNode(
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();

FlinkUnboundedSource<T> unboundedSource = FlinkSource.unbounded(transform.getName(),
rawSource, new SerializablePipelineOptions(context.getPipelineOptions()), parallelism);
FlinkUnboundedSource<T> unboundedSource =
FlinkSource.unbounded(
transform.getName(),
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);
nonDedupSource =
context
.getExecutionEnvironment()
.fromSource(unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo)
.fromSource(
unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo)
.uid(fullName);

if (rawSource.requiresDeduping()) {
Expand Down Expand Up @@ -339,7 +344,8 @@ private static class ReadSourceTranslator<T>
@Override
void translateNode(
PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) {
if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) == PCollection.IsBounded.BOUNDED) {
if (ReadTranslation.sourceIsBounded(context.getCurrentTransform())
== PCollection.IsBounded.BOUNDED) {
boundedTranslator.translateNode(transform, context);
} else {
unboundedTranslator.translateNode(transform, context);
Expand Down Expand Up @@ -375,18 +381,20 @@ public void translateNode(
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();

FlinkBoundedSource<T> flinkBoundedSource = FlinkSource.bounded(
transform.getName(),
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);
FlinkBoundedSource<T> flinkBoundedSource =
FlinkSource.bounded(
transform.getName(),
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);

DataStream<WindowedValue<T>> source;
try {
source =
context
.getExecutionEnvironment()
.fromSource(flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo)
.fromSource(
flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo)
.uid(fullName);
} catch (Exception e) {
throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
Expand Down Expand Up @@ -554,9 +562,9 @@ static <InputT, OutputT> void translateParDo(
KeySelector<WindowedValue<InputT>, ?> keySelector = null;
boolean stateful = false;
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (!signature.stateDeclarations().isEmpty() ||
!signature.timerDeclarations().isEmpty() ||
!signature.timerFamilyDeclarations().isEmpty()) {
if (!signature.stateDeclarations().isEmpty()
|| !signature.timerDeclarations().isEmpty()
|| !signature.timerFamilyDeclarations().isEmpty()) {
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ class FlinkStreamingTranslationContext {
private AppliedPTransform<?, ?, ?> currentTransform;

public FlinkStreamingTranslationContext(
StreamExecutionEnvironment env,
PipelineOptions options,
boolean isStreaming) {
StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) {
this.env = checkNotNull(env);
this.options = checkNotNull(options);
this.isStreaming = isStreaming;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.beam.runners.flink;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;


public interface VersionDependentFlinkPipelineOptions extends PipelineOptions {

@Description("When set to true, the batch job execution will use DataStream API. "
+ "Otherwise, the batch job execution will use the legacy DataSet API.")
@Description(
"When set to true, the batch job execution will use DataStream API. "
+ "Otherwise, the batch job execution will use the legacy DataSet API.")
@Default.Boolean(false)
Boolean getUseDataStreamForBatch();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1639,12 +1639,14 @@ public Instant currentInputWatermarkTime() {
// or long MAX_VALUE. So we should just use the Flink time service watermark in batch mode.
//
// In Flink the watermark ranges from
// [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the beam
// [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the
// beam
// watermark range is [BoundedWindow.TIMESTAMP_MIN_VALUE (-9223372036854775),
// BoundedWindow.TIMESTAMP_MAX_VALUE (9223372036854775)]. To ensure the timestamp visible to
// the users follow the Beam convention, we just use the Beam range instead.
return timerService.currentWatermark() == Long.MAX_VALUE ?
new Instant(Long.MAX_VALUE) : BoundedWindow.TIMESTAMP_MIN_VALUE;
return timerService.currentWatermark() == Long.MAX_VALUE
? new Instant(Long.MAX_VALUE)
: BoundedWindow.TIMESTAMP_MIN_VALUE;
} else {
return new Instant(getEffectiveInputWatermark());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@ public class FlinkExecutionEnvironmentsTest {
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule public ExpectedException expectedException = ExpectedException.none();

@Parameterized.Parameter
public boolean useDataStreamForBatch;
@Parameterized.Parameter public boolean useDataStreamForBatch;

@Parameterized.Parameters(name = "UseDataStreamForBatch = {0}")
public static Collection<Object[]> useDataStreamForBatchJobValues() {
return Arrays.asList(new Object[][] {
{false}, {true}
});
return Arrays.asList(new Object[][] {{false}, {true}});
}

private FlinkPipelineOptions getDefaultPipelineOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,11 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable {

@Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();

@Parameterized.Parameter
public boolean useDataStreamForBatch;
@Parameterized.Parameter public boolean useDataStreamForBatch;

@Parameterized.Parameters(name = "UseDataStreamForBatch = {0}")
public static Collection<Object[]> useDataStreamForBatchJobValues() {
return Arrays.asList(new Object[][] {
{false}, {true}
});
return Arrays.asList(new Object[][] {{false}, {true}});
}

private FlinkPipelineOptions getDefaultPipelineOptions() {
Expand Down Expand Up @@ -193,7 +190,8 @@ public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception {
shouldUsePreparedFilesOnRemoteStreamEnvironment(false);
}

public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) throws Exception {
public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode)
throws Exception {
FlinkPipelineOptions options = getDefaultPipelineOptions();
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster("clusterAddress");
Expand All @@ -206,7 +204,8 @@ public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMod

List<URL> jarFiles;
if (streamingMode || options.getUseDataStreamForBatch()) {
StreamExecutionEnvironment streamExecutionEnvironment = flinkEnv.getStreamExecutionEnvironment();
StreamExecutionEnvironment streamExecutionEnvironment =
flinkEnv.getStreamExecutionEnvironment();
assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class));
jarFiles = getJars(streamExecutionEnvironment);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public void readSourceTranslatorBoundedWithMaxParallelism() {
Object sourceTransform =
applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env);

FlinkBoundedSource<?> source = (FlinkBoundedSource<?>) ((SourceTransformation<?, ?, ?>) sourceTransform).getSource();
FlinkBoundedSource<?> source =
(FlinkBoundedSource<?>) ((SourceTransformation<?, ?, ?>) sourceTransform).getSource();

assertEquals(maxParallelism, source.getNumSplits());
}
Expand All @@ -94,7 +95,8 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() {
Object sourceTransform =
applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env);

FlinkBoundedSource<?> source = (FlinkBoundedSource<?>) ((SourceTransformation<?, ?, ?>) sourceTransform).getSource();
FlinkBoundedSource<?> source =
(FlinkBoundedSource<?>) ((SourceTransformation<?, ?, ?>) sourceTransform).getSource();

assertEquals(parallelism, source.getNumSplits());
}
Expand All @@ -117,7 +119,8 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() {

FlinkSource<?, ?> source =
(FlinkSource<?, ?>)
((SourceTransformation<?, ?, ?>) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource();
((SourceTransformation<?, ?, ?>) Iterables.getOnlyElement(sourceTransform.getInputs()))
.getSource();

assertEquals(maxParallelism, source.getNumSplits());
}
Expand All @@ -138,7 +141,8 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() {

FlinkSource<?, ?> source =
(FlinkSource<?, ?>)
((SourceTransformation<?, ?, ?>) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource();
((SourceTransformation<?, ?, ?>) Iterables.getOnlyElement(sourceTransform.getInputs()))
.getSource();

assertEquals(parallelism, source.getNumSplits());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void testBatch() {

private static void runProgram(String resultPath, boolean streaming) {

Pipeline p = streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch();
Pipeline p =
streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch();

p.apply(GenerateSequence.from(0).to(10))
.apply(
Expand Down

0 comments on commit a05ea70

Please sign in to comment.