-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix write to closed channel panic() in internal/connection during connection close #539
Fix write to closed channel panic() in internal/connection during connection close #539
Conversation
…uring close The race is as follows: T1 - calls SendRequestNoWait(), checks the connection state, and prepares to enter the select statement T2 - calls TriggerClose() closes cnx and the closeCh T3 - run() go-routine for processing incomingRequestsCh drops into case <-closeCh: and calls failLeftRequestsWhenClose() which drains and closes incomingRequestsCh T1 - resumes and drops into the select where both closeCh and incomingRequestsCh are closed. When two cases of a `select` are valid, the case executed is chosen at random; see https://tour.golang.org/concurrency/5 This commit introduces a connectionClosing state and a wait group to track writes by the SendRequest() methods. * TriggerClose() moves the connection into the connectionClosing state before the closeCh is closed. * The failLeftRequestsWhenClosed() method waits on the waitgroup for outstanding SendRequest() methods to complete before it closes the incomingRequestsCh * The SendRequest() methods first add to the waitgroup before checking the connection state; if the state is either closing or closed, SendRequest() returns an error. With the above it is not possible for thread to attempt to add a request to the incomingRequestsCh without being tracked by the waitgroup, and the incomingRequestsCh will not be closed until operations tracked by the waitgroup have completed. Signed-off-by: Daniel Ferstay <[email protected]>
6b3f8b0
to
366a7b4
Compare
@@ -331,10 +335,15 @@ func (c *connection) waitUntilReady() error { | |||
} | |||
|
|||
func (c *connection) failLeftRequestsWhenClose() { | |||
// wait for outstanding incoming requests to complete before draining | |||
// and closing the channel | |||
c.incomingRequestsWG.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused why are we waiting here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cckellogg ,
At this point we are sure that the closeCh is closed and that the state of the connection is either connectionClosing or connectionClosed. We wait to be sure that there will be no further writes to the incomingRequestsCh before attempting to drain and close it. It is possible for the sending go-routine to be executing either of the following lines at the time this function is executed:
- https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L552
- https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L570
As mentioned in the motivation for this MR:
When two cases of a select are valid, the case executed is chosen at random; see https://tour.golang.org/concurrency/5
Whenever I see a channel being closed by a reader go-routine I am suspicious of writers attempting to write to the channel after it is closed.
@@ -546,8 +555,13 @@ func (c *connection) Write(data Buffer) { | |||
|
|||
func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, | |||
callback func(command *pb.BaseCommand, err error)) { | |||
if c.getState() == connectionClosed { | |||
c.incomingRequestsWG.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this waitgroup needed for both send requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned #539 (comment) we need to track all writes to the incomingRequestsCh
CI failed due to the data race which is addressed in #535 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
Thanks @dferstay , please merge master code and fix conflict. |
Signed-off-by: xiaolongran <[email protected]>
Motivation
While working on #535 I noticed that it is possible for a panic() to occur while an internal/connection instance is in the process of closing.
The race is as follows:
When two cases of a
select
are valid, the case executed is chosen at random; see https://tour.golang.org/concurrency/5Modifications
This commit introduces a connectionClosing state and a wait group to track incomingRequestCh writes in the SendRequest() and SendRequestNoWait() methods.
With the above it is not possible for a thread to attempt to write to the incomingRequestsCh without being tracked by the waitgroup, and the incomingRequestsCh will not be closed until writes to the incomingRequestsCh have completed.
Verifying this change
This change is covered by existing tests that exercise the internal/connection lifecycle.