-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 1027][producer] fix: split sendRequest and make reconnectToBroker and other operate in the same coroutine #1029
Conversation
BTW, I think @bschofield about some guess said in #697 is right. |
@zengguan Thanks for your contribute. I have a question, As #687. Can we let the In this way, Let for {
select {
case sendRequest <-p.sendRequestChan:
p.internalSend(sendRequest)
case i := <-p.eventsChan:
switch v := i.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case <-p.connectClosedCh:
p.reconnectToBroker()
case <-p.batchFlushTicker.C:
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}
}
} I think we can discuss clearly here, using channels should avoid using locks. |
…n the same coroutine
@shibd great idea ! Now I split sendRequest and changed |
@BewareMyPower @gaoran10 Can you help take a look? |
Could you update your PR description since you don't use RWLock now? |
It seems the CI is broken. |
Done. |
Please rebase to master to have tests fixed. |
…to flush all messages (#1058) Fixes #1057 ### Motivation `dataChan` is introduced by #1029 to fix the problem of reconnectToBroker. But it missed that if a flush operation excuted, there may still be some messages in `dataChan`. And these messages can't be flushed. ### Modifications - Fix the producer flush opertion is not guarantee to flush all messages
### Motivation After #1029 , `eventChan` is split into `dataChan` and `cmdChan`. But the description of `SendAsync()` is not modified. https://github.com/apache/pulsar-client-go/blob/9867c29ca329302e97ddd9c6a99f66853c7f447f/pulsar/producer.go#L226-L231 ### Modifications - Correct the description of SendAsync() description
### Motivation After apache#1029 , `eventChan` is split into `dataChan` and `cmdChan`. But the description of `SendAsync()` is not modified. https://github.com/apache/pulsar-client-go/blob/9867c29ca329302e97ddd9c6a99f66853c7f447f/pulsar/producer.go#L226-L231 ### Modifications - Correct the description of SendAsync() description
…to flush all messages (#1058) Fixes #1057 ### Motivation `dataChan` is introduced by #1029 to fix the problem of reconnectToBroker. But it missed that if a flush operation excuted, there may still be some messages in `dataChan`. And these messages can't be flushed. ### Modifications - Fix the producer flush opertion is not guarantee to flush all messages (cherry picked from commit 9867c29)
### Motivation After #1029 , `eventChan` is split into `dataChan` and `cmdChan`. But the description of `SendAsync()` is not modified. https://github.com/apache/pulsar-client-go/blob/9867c29ca329302e97ddd9c6a99f66853c7f447f/pulsar/producer.go#L226-L231 ### Modifications - Correct the description of SendAsync() description (cherry picked from commit 50015d3)
Fixes #1027
Motivation
fix send timeout error cause by reconnection failures
Modifications
After #691 , client avoid blocking the reconnection logic, but it also make reconnection and internalSend/internalFlush have concurrency issues。
when the client connection is block, it run into
p.grabCnx()
to get new connection and resend pendingQueue messages. At the same time, the internalSend/internalFlush keep running by use pendingQueue and send mssage byp._getConn().WriteData
. Therefore, it received an ack larger than expected, and trigger the ConnectionClosed logic, then the client enterp.reconnectToBroker()
and repeat the concurrency issues with internalSend/internalFlush, fall into a reconnect-resend-connectionClosed loopVerifying this change
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation