-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: KafkaIO SplittableDoFn not resuming from last committed offset #21730
Comments
@johnjcasey I believe you are looking into a similar issue ? |
I am, you can assign this to me |
Done. Feel free mark as a duplicate if needed. |
Doesn't this mean that some stated functionality just isn't working? Is there a chance of data corruption if the user doesn't notice? That would make this P1 and release blocking, really. |
I think this was fixed by #22450 |
It was |
There is still a reference to this issue in our code: Update: the reason is here: beam/sdks/python/apache_beam/io/kafka.py Line 191 in eb23b0a
Since it is fixed by #22450 is it safe to remove |
* Fix kafka run out of resource upon increase number of records for streaming test * Remove 'use_unbounded_sdf_wrapper' since apache#21730 is resolved * Fix assert expected and actual misplace * Adjust string pipeline timeout 1800s -> 1500s suffice * Fix Java test fail whenever a write bundle fail * Use streaming pipeline to read in Python
What happened?
Using
KafkaIO
with ReadFromKafkaDoFn.java andcommitOffsetsFinalize
should commit offsets of processed messages and if the pipeline is restarted, should resume from the last committed offset.While committing the offset works correctly, resuming from the latest committed offset does not work because the groupId used are not the same (using 2.39.0).
Committing the offsets happens in KafkaCommitOffset.java with a consumer defined as:
consumerFactoryFn.apply(updatedConsumerConfig)
Reading the startOffset is done in
initialRestriction()
in ReadFromKafkaDoFn.java with a consumer defined as:consumerFactoryFn.apply( KafkaIOUtils.getOffsetConsumerConfig( "initialOffset", offsetConsumerConfig, updatedConsumerConfig)))
Since the group name of the consumer is not the same as the group name of the consumer fetching the latest offset, the pipeline will always start from the beginning of the (topic,partition) again.
Issue Priority
Priority: 2
Issue Component
Component: io-java-kafka
The text was updated successfully, but these errors were encountered: