Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross-language consistency (RequiresStableInputs) is quietly broken (at least on portable flink runner) #20812

Closed
damccorm opened this issue Jun 4, 2022 · 1 comment · Fixed by #22889

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

Since the Python SDK does not seem to provide anything similar to https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html, I wrote a small cross-language transform in Java, to be called from a Python SDK pipeline executed using the Flink runner. The expectation was that it would perform the necessary buffering to correctly implement exactly-once semantics in my use case.

However, this did not result in the creation of any Flink checkpoints. The reason seems to be that the code in

is never executed, because the UDF is called using the FnApiDoFnRunner instead.

This behavior appears particularly problematic, because the RequiresStableInputs annotation is silently ignored, so users might falsely believe that they get exactly-once semantics (EOS), whereas they only get some kind of "at-least-once if the upstream pipeline happens to be deterministic" (which is not the case in general).

Thus, if a user where to use, e.g., the Kafka EOS sink (https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-) which relies on the RequiresStableInputs mechanism, in their cross-language Java UDF, that might not provide correct (i.e., potentially not even at-least-once) output in general if the upstream pipeline is not deterministic and needs to be replayed from a checkpoint.

I feel this issue should be prioritized, because it essentially makes it impossible to achieve source-to-sink exactly once guarantees when using Beam on Flink with the Python SDK.

From a user perspective, either (or a combination) of the following would resolve the issue:

  • Implement something like RequiresStableInputs for the Python SDK's DoFn (and ensure that using RequiresStableInputs in a Java-based DoFn results in an error if the latter is called from a pipeline defined with the Python SDK).
  • Extend the FnApiDoFnRunner to provide stable inputs to DoFn which require it.

Unfortunately, I do not feel familiar enough with the code base to address the issue myself — at least not without further guidance, so any feedback is welcome.

Imported from Jira BEAM-11755. Original Jira may contain additional context.
Reported by: pikulmar.

@kennknowles
Copy link
Member

Noting that there is a PR under review for this. Adding this comment so it does not show up in our high priority daily report.

je-ik added a commit to je-ik/beam that referenced this issue Jan 2, 2023
je-ik added a commit to je-ik/beam that referenced this issue Jan 3, 2023
 Fix FlinkRequiresStableInputTest flakiness (apache#21333)
je-ik added a commit to je-ik/beam that referenced this issue Jan 3, 2023
lukecwik added a commit that referenced this issue Jan 23, 2023
* Handle @RequiresStableInput in portable flink (#20812)

* [runners-flink] Remove unnnecessary dependency on flink-annotations

* Fix @RequiresStableInput for portable Flink (#20812)

 Fix FlinkRequiresStableInputTest flakiness (#21333)

* Flink: Tests for stateful stable dofns (#20812)

* Enable commit for kafka flink portable test

* Apply suggestions from code review

Co-authored-by: Lukasz Cwik <[email protected]>

* Add callback to BufferingDoFnRunner for flushing SDK harness results

* revert changes in website

Co-authored-by: Lukasz Cwik <[email protected]>
@github-actions github-actions bot added this to the 2.46.0 Release milestone Jan 23, 2023
minxhe pushed a commit to minxhe/beam that referenced this issue Nov 25, 2024
* [runners-flink] Remove unnnecessary dependency on flink-annotations

* Fix @RequiresStableInput for portable Flink (apache#20812)

 Fix FlinkRequiresStableInputTest flakiness (apache#21333)

* Flink: Tests for stateful stable dofns (apache#20812)

* Enable commit for kafka flink portable test

* Apply suggestions from code review

Co-authored-by: Lukasz Cwik <[email protected]>

* Add callback to BufferingDoFnRunner for flushing SDK harness results

* revert changes in website

Co-authored-by: Lukasz Cwik <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment