-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 662] Fix race in connection.go waitUntilReady() #663
Conversation
@cckellogg @BewareMyPower I think this relates to your discussion in https://github.com/apache/pulsar-client-go/pull/631/files#r723350028, would appreciate it if you could take a look. You correctly removed the use of |
And see https://pkg.go.dev/[email protected]#NewCond
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
} So I think Back to your issue, how could you ensure package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
flag := false
defer m.Unlock()
m.Lock()
go func() {
// No locks here
fmt.Println("Before Broadcast")
flag = true
c.Broadcast()
fmt.Println("After Broadcast")
}()
// Sleep for 2 seconds to make sure c.Broadcast() is called before c.Wait()
time.Sleep(time.Duration(2) * time.Second)
fmt.Println("Before Wait")
for !flag {
c.Wait()
}
fmt.Printf("After Wait, flag is %v\n", flag)
} I called
|
Sorry I gave a wrong example. The for !flag {
c.Wait()
} to for {
c.Wait()
if flag {
break
}
} |
I agree that we should call m := &sync.Mutex{}
c := sync.NewCond(m)
m.Lock()
go func() {
m.Lock() // Wait for c.Wait()
c.Broadcast()
m.Unlock()
}()
c.Wait() // Unlocks m, waits, then locks m again
m.Unlock() Because in that example, the |
The current func (c *connection) waitUntilReady() error {
c.Lock()
defer c.Unlock()
c.cond.L.Lock()
defer c.cond.L.Unlock()
for c.getState() != connectionReady {
c.log.Debugf("Wait until connection is ready state=%s", c.getState().String())
// 1. `c.getState()` is called again
if c.getState() == connectionClosed {
return errors.New("connection error")
}
// 2. Even if the 2nd call of `c.getState()` returns connectionReady, `c.cond.Wait()` will still be called
c.cond.Wait()
}
return nil
} See my comments for my concern. Therefore, I think a better solution is func (c *connection) waitUntilReady() error {
for {
switch state := c.getState(); state {
case connectionReady:
return nil
case connectionClosed:
return errors.New("connection error")
default:
c.log.Debugf("Wait until connection is ready state=%s", state.String())
// Here we use c.cond.L.Lock() because we might not use c.Lock as the cond's internal Locker in future
c.cond.L.Lock()
defer c.cond.L.Unlock()
c.cond.Wait()
}
}
} |
pulsar/internal/connection.go
Outdated
@@ -327,8 +327,8 @@ func (c *connection) doHandshake() bool { | |||
} | |||
|
|||
func (c *connection) waitUntilReady() error { | |||
c.Lock() | |||
defer c.Unlock() | |||
c.cond.L.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious what's the difference between doing c.Lock()
vs c.cond.L.Lock()
? Same for the changeState function? From looking at the code they should share the same lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're two different mutexes. See #663 (comment) for an explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that waitUntilReady()
and changeState()
should share the same lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I was wrong! See #663 (comment) which proves that c.Lock()
and c.cond.L.Lock()
are the same thing.
Thanks for taking a look at this, @BewareMyPower and @cckellogg!
I'm pretty sure that's not the case. If we take a look at the definition of the connection type: type connection struct {
sync.Mutex
cond *sync.Cond
[...] ...then we can see that If we next take a look at the definition of type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
[...] (where Another way we can convince ourselves of this is by noting that it is forbidden to copy a |
It doesn't matter whether If if func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
} (note that the |
@BewareMyPower: I tried this modification of your code, which I think is exactly the one you suggested: package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
flag := false
defer m.Unlock()
m.Lock()
go func() {
// No locks here
fmt.Println("Before Broadcast")
flag = true
c.Broadcast()
fmt.Println("After Broadcast")
}()
// Sleep for 2 seconds to make sure c.Broadcast() is called before c.Wait()
time.Sleep(time.Duration(2) * time.Second)
fmt.Println("Before Wait")
for {
c.Wait()
if flag {
break
}
}
fmt.Printf("After Wait, flag is %v\n", flag)
} When I run that, this happens:
So, I think that test case (which you very kindly provided) confirms that we agree that it really is necessary to call |
With that solution, the bug that I am experiencing in production could still occur: [goroutine A] calls To avoid this, it is necessary to hold the lock before calling |
Yeah, I agree.
It makes sense. We should make the combination of
Thanks for pointing it out. I'm not so familiar with Go, just being used to C++ object's lifetime rule, which is a little different. The only question I'm still confused is, could you explain following code and its output? package main
import (
"fmt"
"sync"
)
type connection struct {
sync.Mutex
cond *sync.Cond
}
func NewConnection() *connection {
cnx := &connection{}
cnx.cond = sync.NewCond(cnx)
return cnx
}
func main() {
cnx := NewConnection()
fmt.Printf("%v\n%v\n", cnx, cnx.cond.L)
cnx.cond.L.Lock()
cnx.Lock()
cnx.Unlock()
cnx.cond.L.Unlock()
}
It looks like |
It's right. I also realized this problem just now. Here we should lock the whole |
Ag! I'm so sorry @BewareMyPower, you're 100% correct:
I think that's up to you guys -- it depends what the ultimate intent of the lock is. Is it to protect against all state changes in the connection, or just some of them? |
Yeah, it won't block this PR. Just say it because I've seen the similar case in C++ client that a common mutex is used everywhere. For this PR, I think you can change |
…ocker as an argument. Add comments to make it clear why this lock is required, to hopefully avoid accidental removal in future.
OK, I made that change and added a comment so hopefully nobody removes the lock in future. Thanks again for looking at this, I really appreciate it! |
Fixes #662
Motivation
As described in #662, there appears to be a potential race condition in connection.go function waitUntilReady(): the
cond.Broadcast()
can occur before thecond.Wait()
.[EDIT:] To be explicit, this is the issue:
[goroutine A] calls c.getState() and sees that it is not set to connectionReady
[goroutine B] changes the state to connectionReady
[goroutine B] sends a cond.Broadcast(), which goes nowhere because no goroutine is waiting.
[goroutine A] calls cond.Wait(), which never completes
Modifications
Function waitUntilReady() was previously holding the global
c.Lock()
on the connection. From my reading of the code, this mutex is intended to protect thecnx
variable. I think that the use ofc.Lock()
in waitUntilReady() was probably just a typo.Instead, I think the author probably intended to grab the lock associated with the
sync.Cond
, i.e.c.cond.L.Lock()
. This looks like the correct thing to do when using async.Cond
. I think there should be a corresponding lock around thecond.Broadcast()
also. See https://stackoverflow.com/questions/36857167/how-to-correctly-use-sync-cond/42772799#42772799 for more details.Verifying this change
I am unsure if this change is covered by existing tests. It fixes a rare race condition, so I think it would be quite difficult to test for.
I have deployed this change on my own production system, and it doesn't obviously break anything. I will report back if I see any issues that might be related to it.
Does this pull request potentially affect one of the following parts:
Documentation