-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: normalize all send request resource release into sr.done #1121
Conversation
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Signed-off-by: tison <[email protected]>
Here is the final failure test -
|
Signed-off-by: tison <[email protected]>
@@ -458,7 +458,6 @@ func TestAckChunkMessage(t *testing.T) { | |||
// Create transaction and register the send operation. | |||
txn, err := client.NewTransaction(time.Hour) | |||
require.Nil(t, err) | |||
txn.(*transaction).registerSendOrAckOp() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do register in internalSendAsync
. This call seems like a workaround we ever added as a HACK 🤣
Should merge #1120 first |
BlockingIfQueueFull is convenient for users, recover the manner in this patch. |
Signed-off-by: tison <[email protected]>
p.metrics.BytesPublished.Add(payloadSize) | ||
p.metrics.BytesPending.Sub(payloadSize) | ||
|
||
if sr.callback != nil || len(p.options.Interceptors) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent - sr.callback != nil
is always true.
// Large messages will be split into 11 chunks, exceeding the length of pending queue | ||
ID, err := producer.Send(context.Background(), &ProducerMessage{ | ||
_, err = producer.Send(ctx, &ProducerMessage{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for Fix 2.
I think it is OK, well done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merging... Thanks for your review! Further improvement can be made as described above. |
This refers to #1071. This follows up to #1120.
This patch contains two fixes with one refactor unsplittably.
sr.done
and replace it everywhere release sendrequests.internalSendAck
. Although it breaks chunked messages can pass the permits limitation one by one underDisableBlockIfQueueFull=false
, but the Golang client logic is different from Java one, where Golang's internalSendAsync and internalSend do not run in the same thread. I'd prefer to regard the original manner a possible improvement but blocking require full permits should work well for most scenarios.Together, this patch fixes #1043.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation