Producer respects Context passed to Send() and SendAsync() when appying backpressure #534
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Previously, the Producer ignored the context passed to Send() and
SendAsync().
Now, the Producer respects the context in the case where the
ProducerOptions.MaxPendingMessages limit is breached. In this case, the
producer will block until a permit for sending the message is available or
until the context expires, whichever comes first.
Failures to send messages due to context expiration are communicated to
callers via the existing TimeoutError error code.
Signed-off-by: Daniel Ferstay [email protected]
Motivation
When the Producer is applying backpressure to the caller it will block attempting to acquire a send permit.
Before this change change, the producer would block indefinitely as a send permit must be acquired before the send timeout can be checked by the internal
runEventsLoop
go-routine.With this change, callers can control the duration they are willing to wait for a send permit by using the context passed to the Send() and SendAsync() methods.
Modifications
Semaphore.Acquire()
to accept a context and block until acquisition is complete or the passed context is expired, whichever comes firstpartitionProducer.internalSendAsync()
to propagate the calling context when attempting to acquire the publish semaphore, and fail the request if the acquire is unsuccessful.Verifying this change