Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide option for KafkaReceiver's graceful shutdown #378

Open
cjlee38 opened this issue Jan 3, 2024 · 4 comments
Open

Provide option for KafkaReceiver's graceful shutdown #378

cjlee38 opened this issue Jan 3, 2024 · 4 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@cjlee38
Copy link

cjlee38 commented Jan 3, 2024

Motivation

Over the past few days, I've been looking for a way to shutdown gracefully KafkaReceiver, but couldn't find any proper way to handle this. I read related issues (#247, #51, #196) or SO questions but they don't work as I expected (This might be caused by my bad understanding of reactor or kafka, so please excuse me)

Example code snippets :

val disposable = kafkaReceiver.receive()
    .flatMapSequential { record -> process(record).thenReturn(record) }
    .concatMap { record -> 
        record.receiverOffset().acknowledge()
        record.receiverOffset().commit() 
    }
    .subscribe()

This is typical case.

  1. receive record
  2. process record
  3. ack and commit (can be omitted when using auto-commit)

Desired solution

So, I think this is very common case : when I re-start my application(which is based on spring framework), consumers stop fetching records, and ongoing(I mean, already fetched records) flux keeps processing and also commits, and then complete the flux.

However, just disposing the disposable would not work as expected, because there is possibility that processed record not be committed.

Considered alternatives

There is no concrete idea to implement this, but things to consider are next.

  1. The Scheduler interface of reactor provides disposeGracefully method.
    image
    These methods (1 2) can be replaced with this (or selected by option)
  2. add sink.emitComplete() in ConsumerEventLoop#stop
  3. It looks like ConsumerEventLoop keeps polling from broker without hesitation and emit records into sink. If it's right, when producing numerous records in an instant would cause some problems. For example, let's say 10,000 records are produced, and consumer fetched them all within a few seconds. Besides OOM issue, flux needs to wait until all records are drained for desired graceful shutdown. I think emitting records should have some delays.

Additional context

In case of my ignorance, please let me know. Any other opinions would be appreciated. Thanks

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jan 3, 2024
@KafkaProServerless
Copy link

upvote

3 similar comments
@ajax-semenov-y
Copy link

upvote

@minzhang-haus
Copy link

upvote

@mariusgiger
Copy link

upvote

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

6 participants