Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiangjie Qin committed Feb 25, 2023
1 parent 3516f4c commit 1ba0471
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public static <T> FlinkUnboundedSource<T> unbounded(
public static FlinkBoundedSource<byte[]> unboundedImpulse(long shutdownSourceAfterIdleMs) {
FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults();
flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs);
// Here we wrap the BeamImpulseSource with a FlinkBoundedSource, but overriding its
// boundedness to CONTINUOUS_UNBOUNDED. By doing so, the Flink engine will treat this
// source as an unbounded source and execute the job in streaming mode. This also
// works well with checkpoint, because the FlinkSourceSplit containing the
// BeamImpulseSource will be discarded after the impulse emission. So the streaming
// job won't see another impulse after failover.
return new FlinkBoundedSource<>(
new BeamImpulseSource(),
new SerializablePipelineOptions(flinkPipelineOptions),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,15 @@ public CompletableFuture<Void> isAvailable() {
checkExceptionAndMaybeThrow();
if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
// There are still live readers.
return isAvailableForAliveReaders();
CompletableFuture<Void> aliveReaderAvailableFuture = isAvailableForAliveReaders();
// Regardless of whether there is data available from the alive readers, the
// main thread needs to be woken up if there is a split change. Hence, we
// need to combine the data available future with the split change future.
if (waitingForSplitChangeFuture.isDone()) {
waitingForSplitChangeFuture = new CompletableFuture<>();
}
return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture)
.thenAccept(ignored -> {});
} else if (noMoreSplits) {
// All the splits have been read, wait for idle timeout.
checkIdleTimeoutAndMaybeStartCountdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import static org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader.PENDING_BYTES_METRIC_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -100,8 +102,8 @@ public void testIsAvailableAlwaysWakenUp() throws Exception {
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
FlinkSourceSplit<KV<Integer, Integer>>>
reader = createReader(executor, Long.MAX_VALUE)) {
reader.addSplits(splits);
reader.start();
reader.addSplits(splits);

Thread mainThread =
new Thread(
Expand Down Expand Up @@ -138,6 +140,42 @@ public void testIsAvailableAlwaysWakenUp() throws Exception {
}
}

@Test
public void testIsAvailableOnSplitChangeWhenNoDataAvailableForAliveReaders() throws Exception {
List<FlinkSourceSplit<KV<Integer, Integer>>> splits1 = new ArrayList<>();
List<FlinkSourceSplit<KV<Integer, Integer>>> splits2 = new ArrayList<>();
splits1.add(new FlinkSourceSplit<>(0, new DummySource(0)));
splits2.add(new FlinkSourceSplit<>(1, new DummySource(0)));
RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits1);
ManuallyTriggeredScheduledExecutorService executor =
new ManuallyTriggeredScheduledExecutorService();

try (SourceReader<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
FlinkSourceSplit<KV<Integer, Integer>>>
reader = createReader(executor, Long.MAX_VALUE)) {
reader.start();
reader.addSplits(splits1);

assertEquals(
"The reader should have nothing available",
InputStatus.NOTHING_AVAILABLE,
reader.pollNext(validatingOutput));

CompletableFuture<Void> future1 = reader.isAvailable();
assertFalse("Future1 should be uncompleted without live split.", future1.isDone());

reader.addSplits(splits2);
assertTrue("Future1 should be completed upon addition of new splits.", future1.isDone());

CompletableFuture<Void> future2 = reader.isAvailable();
assertFalse("Future2 should be uncompleted without live split.", future2.isDone());

reader.notifyNoMoreSplits();
assertTrue("Future2 should be completed upon NoMoreSplitsNotification.", future2.isDone());
}
}

@Test
public void testWatermark() throws Exception {
ManuallyTriggeredScheduledExecutorService executor =
Expand Down

0 comments on commit 1ba0471

Please sign in to comment.