Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink Runner] Add new Source classes that are based on FLIP-27 Source API. #25525

Merged
merged 2 commits into from
Mar 6, 2023

Conversation

becketqin
Copy link
Contributor

@becketqin becketqin commented Feb 17, 2023

This is the first patch of migrating Flink runner batch job execution from DataSet to DataStream API.

The patch does the following:

  1. Add two FLIP-27 source implementations: FlinkBoundedSource, FlinkUnboundedSource. ImpulseSource was implemented as a built-in type of FlinkBoundedSource.
  2. Introduce necessary compatibility bridging classes to make the code work for Flink version [1.12, 1.15].

CHANGES.md was not modified with this patch because this patch does not change the existing execution paths. The followup patches will update CHANGES.md.

One notable difference between FLIP-27 Source implementations and the existing UnboundedSourceWrapper class is that the new FLIP-27 source does not support accumulators yet. So the beam metrics from the MetricsContainer will not be reported. However, the FLIP-27 based sources will emit their own metrics.

fixes #25486


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@becketqin
Copy link
Contributor Author

@xinyuiscool would you have time to take a look? Thanks!

@becketqin becketqin force-pushed the datastream-migration-1 branch from d572623 to ef85675 Compare February 17, 2023 08:21
@becketqin
Copy link
Contributor Author

The failed PreCommit test is irrelevant to this patch.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@xinyuiscool
Copy link
Contributor

Run Java PreCommit

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed half part. Will continue later.

newFuture.complete(null);
} else {
LOG.debug("There is no data available, scheduling the idle reader checker.");
scheduleTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is a bit unclear to me. Seems this checker thread will complete the dataAvailableFuture without start() being called. Does it mean that the reader will start to poll? Or isAvailable() will be invoked again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start() is called only once right after the instantiation of the reader object. After that, the reader main thread will block on the future returned from isAvailable() if there is no data available for read. Before the main thread goes to block on that future, it sets the check thread to wake itself up after SLEEP_ON_IDLE_MS by completing that future. So the reader main thread will start to poll() again to check if there is more data available.

Logically speaking this is as if the reader main thread calls Thread.sleep(SLEEP_ON_IDLE_MS), except that we don't want to hijack the reader main thread in this case, because the reader main thread is also the task main thread and may need to do other things (e.g. checkpointing) even if there is no data available for reading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. So here is the intention is to put a wait for the next poll. I was thinking somewhere dataAvailableFuture will be set once the data is available. Looks like it's not used that way.

checkExceptionAndMaybeThrow();
LOG.info("Adding splits {}", splits);
sourceSplits.addAll(splits);
waitingForSplitChangeFuture.complete(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If notifyNoMoreSplits() is bound to be called after all splits are added, we only need to complete this future in the notifyNoMoreSplit(), right? it's a bit weird that we complete this future in two places and then in line 174 we recreate the future.

If we only need to complete this future once, then we can mark the var final so it's easier to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, there is no guarantee that notifyNoMoreSplits is always going to be invoked, or when it will be invoked.

I think we need to complete the future in both addSplits() and notifyNoMoreSplits(). Basically if a reader goes to sleep because it has exhausted all the splits, it needs to be waken up either when a new split is assigned or NoMoreSplits notification is received.

Our current SplitEnumerator implementation follows a static splits assignment approach, so it sends all the splits to a subtask at once and then sends the NoMoreSplits notification immediately. So timing wise, it seems that the reader can wait for the NoMoreSplits notification and then act. However, that won't work for the dynamic assignment case. For example, if a reader only gets one split at a time and will request another split from the SplitEnumerator after finishing reading from the current split, it has to wake up and poll once a split is assigned. So the splitChangeFuture has to be completed in addSplits(). Also, if the reader has exhausted all the splits and gone to sleep, it has to be waken up upon receiving NoMoreSplits notification so it can exit normally.

Orthogonally, it seems the code does have a bug here. If there are live readers, the future returned by isAvailableForAliveReaders() should also be completed when there is a split change, either from addSplits() or notifyNoMoreSplits(). I'll update the patch to fix that.

// ------------------------------ private methods ------------------------------

@SuppressWarnings("unchecked")
private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method uses the OutputT from the enclosing class. So it is not static. We can make it static, but the benefit might be limited given we don't expect tons of reader instances. And it also makes the code somewhat less readable.

}
}

private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, this method uses the type T from the enclosing class.

}
}

private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

while (splitIter.hasNext()) {
Map.Entry<Integer, List<FlinkSourceSplit<T>>> entry = splitIter.next();
int readerIndex = entry.getKey();
int targetSubtask = readerIndex % context.currentParallelism();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By reading the above code, seems the key is already the targetSubtask, so maybe we don't need to do this again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I follow the comment. The key will be in the pendingSplits map because that is an "intended assignments" regardless of whether the target reader has actually registered to the SplitEnumerator or not. But we can only send the "intended assignments" to a reader if that reader has registered to the SplitEnumerator. That is what the check does.

Copy link
Contributor Author

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xinyuiscool Thanks for the comments. I'll update the patch.

checkExceptionAndMaybeThrow();
LOG.info("Adding splits {}", splits);
sourceSplits.addAll(splits);
waitingForSplitChangeFuture.complete(null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, there is no guarantee that notifyNoMoreSplits is always going to be invoked, or when it will be invoked.

I think we need to complete the future in both addSplits() and notifyNoMoreSplits(). Basically if a reader goes to sleep because it has exhausted all the splits, it needs to be waken up either when a new split is assigned or NoMoreSplits notification is received.

Our current SplitEnumerator implementation follows a static splits assignment approach, so it sends all the splits to a subtask at once and then sends the NoMoreSplits notification immediately. So timing wise, it seems that the reader can wait for the NoMoreSplits notification and then act. However, that won't work for the dynamic assignment case. For example, if a reader only gets one split at a time and will request another split from the SplitEnumerator after finishing reading from the current split, it has to wake up and poll once a split is assigned. So the splitChangeFuture has to be completed in addSplits(). Also, if the reader has exhausted all the splits and gone to sleep, it has to be waken up upon receiving NoMoreSplits notification so it can exit normally.

Orthogonally, it seems the code does have a bug here. If there are live readers, the future returned by isAvailableForAliveReaders() should also be completed when there is a split change, either from addSplits() or notifyNoMoreSplits(). I'll update the patch to fix that.

// ------------------------------ private methods ------------------------------

@SuppressWarnings("unchecked")
private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method uses the OutputT from the enclosing class. So it is not static. We can make it static, but the benefit might be limited given we don't expect tons of reader instances. And it also makes the code somewhat less readable.

}
}

private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, this method uses the type T from the enclosing class.

}
}

private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

while (splitIter.hasNext()) {
Map.Entry<Integer, List<FlinkSourceSplit<T>>> entry = splitIter.next();
int readerIndex = entry.getKey();
int targetSubtask = readerIndex % context.currentParallelism();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I follow the comment. The key will be in the pendingSplits map because that is an "intended assignments" regardless of whether the target reader has actually registered to the SplitEnumerator or not. But we can only send the "intended assignments" to a reader if that reader has registered to the SplitEnumerator. That is what the check does.

newFuture.complete(null);
} else {
LOG.debug("There is no data available, scheduling the idle reader checker.");
scheduleTask(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start() is called only once right after the instantiation of the reader object. After that, the reader main thread will block on the future returned from isAvailable() if there is no data available for read. Before the main thread goes to block on that future, it sets the check thread to wake itself up after SLEEP_ON_IDLE_MS by completing that future. So the reader main thread will start to poll() again to check if there is more data available.

Logically speaking this is as if the reader main thread calls Thread.sleep(SLEEP_ON_IDLE_MS), except that we don't want to hijack the reader main thread in this case, because the reader main thread is also the task main thread and may need to do other things (e.g. checkpointing) even if there is no data available for reading.

Copy link
Contributor Author

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xinyuiscool Thanks for the comments. I have updated the patch to address them.

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

newFuture.complete(null);
} else {
LOG.debug("There is no data available, scheduling the idle reader checker.");
scheduleTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. So here is the intention is to put a wait for the next poll. I was thinking somewhere dataAvailableFuture will be set once the data is available. Looks like it's not used that way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Task]: Introduce new Flink source implementations based on FLIP-27 Source interfaces.
2 participants