-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix stuck when reconnect broker #703
Conversation
Signed-off-by: xiaolongran <[email protected]>
pulsar/internal/connection_pool.go
Outdated
key, conn.logicalAddr, conn.physicalAddr) | ||
|
||
// remove stale/failed connection | ||
if conn.closed() { | ||
// When the current connection is in a closed state or the broker actively notifies that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the broker notify that the connection is closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The action of the broker's notification to close the connection will be processed by the two commands CommandCloseProducer and CommandCloseConsumer. Generally, it may trigger the automatic load balancing of the topic or the unload operation will trigger the active closing of the broker.
pulsar/internal/connection.go
Outdated
@@ -163,6 +163,9 @@ type connection struct { | |||
consumerHandlersLock sync.RWMutex | |||
consumerHandlers map[uint64]ConsumerHandler | |||
|
|||
reconnectFlagLock sync.Mutex | |||
reconnectFlag bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable is a little confusing to me. What is this flag suppose to indicate? And if it's set is the connection in an unusable state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calling path of the entire code is:
- The broker notifies the client to close the connection and sends CommandCloseProducer or CommandCloseConsumer cmd to the client
- The client receives this command and starts to enter the logic of handleCloseConsumer or handleCloseProducer
- The client starts to trigger the ConnectionClosed action, and then enters the reconnection logic
- In the reconnection logic, the GetConnection method will be called to obtain a connection
Previously in GetConnection(), we obtained the old connection, and now we want to change this behavior, expecting to remove the old connection object from the connections map when the ConnectionClosed action is triggered, and then from the GetConnection() method to get a new connection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the current wrapper implementation of Connection and ConnectionPool, we have two elegant ways to handle this:
- In my current implementation, a flag is introduced into the struct of connection to mark the state of reconnection
- When using changeState() to trigger handleCloseProducer, set the state of the current connection to close
Signed-off-by: xiaolongran <[email protected]>
@cckellogg Choose to reuse the current code logic for processing. When receiving the CommandCloseProducer or CommandCloseConsumer cmd, set the current connection state to
|
@wolfstudy I think that might be an incorrect observation. In the Java SDK, the concepts (classes) are fairly misleading, and something that might seem to be a connection close isn't one. The ClientHandler is really a reference to a connection in the Java SDK. The ClientCnx isn't closed in the Java SDK in ClientHandler.connectionClosed |
This reverts commit 1a8432c.
This reverts commit 1a8432c.
This reverts commit 1a8432c.
Revert "Revert "Fix stuck when reconnect broker (apache#703)" (apache#767)"
Signed-off-by: xiaolongran [email protected]
Fixes #697
Motivation
As #697 said, In Go SDK, when the reconnection logic is triggered under certain conditions, the reconnection will not succeed due to request timeout.
Comparing the implementation of the Java SDK, we can see that each time the reconnection logic is triggered, the original connection will be closed and a new connection will be created.
So in this pr, we introduced a new
reconnectFlag
field in theconnection
struct to mark the reconnection state. When the broker actively informs the client to close the connection to trigger the reconnection logic, we will store it from theconnections
cache of theconnectionPool
. The old connection object is deleted, and a new connection is created to complete the reconnectionModifications
reconnectFlag
inconnection
struct