From 1ba0471d5eba08f1cbcd9a05fa8e4abde3b45eb1 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Sat, 25 Feb 2023 10:01:21 +0800 Subject: [PATCH] Address comments --- .../streaming/io/source/FlinkSource.java | 6 +++ .../io/source/FlinkSourceReaderBase.java | 10 ++++- .../FlinkUnboundedSourceReaderTest.java | 40 ++++++++++++++++++- 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java index e06ebb413e07..c001b263340c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java @@ -70,6 +70,12 @@ public static FlinkUnboundedSource unbounded( public static FlinkBoundedSource unboundedImpulse(long shutdownSourceAfterIdleMs) { FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults(); flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs); + // Here we wrap the BeamImpulseSource with a FlinkBoundedSource, but overriding its + // boundedness to CONTINUOUS_UNBOUNDED. By doing so, the Flink engine will treat this + // source as an unbounded source and execute the job in streaming mode. This also + // works well with checkpoint, because the FlinkSourceSplit containing the + // BeamImpulseSource will be discarded after the impulse emission. So the streaming + // job won't see another impulse after failover. return new FlinkBoundedSource<>( new BeamImpulseSource(), new SerializablePipelineOptions(flinkPipelineOptions), diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index d48f7a7ea930..27b84910ac27 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -163,7 +163,15 @@ public CompletableFuture isAvailable() { checkExceptionAndMaybeThrow(); if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) { // There are still live readers. - return isAvailableForAliveReaders(); + CompletableFuture aliveReaderAvailableFuture = isAvailableForAliveReaders(); + // Regardless of whether there is data available from the alive readers, the + // main thread needs to be woken up if there is a split change. Hence, we + // need to combine the data available future with the split change future. + if (waitingForSplitChangeFuture.isDone()) { + waitingForSplitChangeFuture = new CompletableFuture<>(); + } + return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture) + .thenAccept(ignored -> {}); } else if (noMoreSplits) { // All the splits have been read, wait for idle timeout. checkIdleTimeoutAndMaybeStartCountdown(); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java index 95a9f6d7f270..f420bd8900ff 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -19,8 +19,10 @@ import static org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader.PENDING_BYTES_METRIC_NAME; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; @@ -100,8 +102,8 @@ public void testIsAvailableAlwaysWakenUp() throws Exception { WindowedValue>>, FlinkSourceSplit>> reader = createReader(executor, Long.MAX_VALUE)) { - reader.addSplits(splits); reader.start(); + reader.addSplits(splits); Thread mainThread = new Thread( @@ -138,6 +140,42 @@ public void testIsAvailableAlwaysWakenUp() throws Exception { } } + @Test + public void testIsAvailableOnSplitChangeWhenNoDataAvailableForAliveReaders() throws Exception { + List>> splits1 = new ArrayList<>(); + List>> splits2 = new ArrayList<>(); + splits1.add(new FlinkSourceSplit<>(0, new DummySource(0))); + splits2.add(new FlinkSourceSplit<>(1, new DummySource(0))); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits1); + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + + try (SourceReader< + WindowedValue>>, + FlinkSourceSplit>> + reader = createReader(executor, Long.MAX_VALUE)) { + reader.start(); + reader.addSplits(splits1); + + assertEquals( + "The reader should have nothing available", + InputStatus.NOTHING_AVAILABLE, + reader.pollNext(validatingOutput)); + + CompletableFuture future1 = reader.isAvailable(); + assertFalse("Future1 should be uncompleted without live split.", future1.isDone()); + + reader.addSplits(splits2); + assertTrue("Future1 should be completed upon addition of new splits.", future1.isDone()); + + CompletableFuture future2 = reader.isAvailable(); + assertFalse("Future2 should be uncompleted without live split.", future2.isDone()); + + reader.notifyNoMoreSplits(); + assertTrue("Future2 should be completed upon NoMoreSplitsNotification.", future2.isDone()); + } + } + @Test public void testWatermark() throws Exception { ManuallyTriggeredScheduledExecutorService executor =