Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a set_offset() method to partition_consumer.rb #69

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

bkao
Copy link

@bkao bkao commented Oct 9, 2014

When Kafka has a corrupt message poseidon gets stuck attempting to read the same offset ad infinitum. We needed to be able to skip ahead (presumably past a corrupt message or disk corruption) so we could implement logic like this:

loop do
  docs = @kafka.fetch
  break if docs.size > 0
  break if @kafka.next_offset >= @kafka.highwater_mark

  # We can get stuck due to a corrupt message in Kafka.  Kafka's            
  # response (possibly poseidon) doesn't distinguish between no data        
  # and bad data.  Either way we simply get back an empty fetch.  To        
  # get around this, we attempt to advance the offset and re-fetch.         
  # If offset is past the latest offset, Kafka will throw an error          
  # and we'll revert the offset.                                            
  @kafka.set_offset(@kafka.next_offset + 1)
end

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.05%) when pulling 60583fb on ListenFirstMedia:master into 1b04b18 on bpot:master.

@bpot
Copy link
Owner

bpot commented Dec 6, 2014

@bkao it may be the case that the message you are trying to fetch is larger than the max_fetch_size. Do you know of anyway to reproduce the issue?

I'm hesitant to add the #set_offset API because it may be used as a work-around for legitimate issues which need to be fixed, but I will think about it some.

@bkao
Copy link
Author

bkao commented Dec 6, 2014

Nor am i certain that this is the right way to work around the specific
issue that I had. But my specific use case involved corrupted sectors on
disk. I needed a way to scan past the corruption. Without it, Kafka just
got stuck.

@bkao https://github.com/bkao it may be the case that the message you are
trying to fetch is larger than the max_fetch_size. Do you know of anyway to
reproduce the issue?

I'm hesitant to add the #set_offset API because it may be used as a
work-around for legitimate issues which need to be fixed, but I will think
about it some.


Reply to this email directly or view it on GitHub
#69 (comment).

@MichaelBaker
Copy link

I also need to be able to set the offset.

Say I fetch messages with offsets 1, 2, and 3. I successfully process 1 and 2 (and I keep track of this fact), but then some error occurs. I need to pick back up at offset 3, but currently that means reestablishing the entire connection, which doesn't seem correct.

Or say I'm reading through the messages and I want to replay the previous 10 messages, again it requires me to reestablish the connection just to change the offset.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants