Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race in matching test suite #5781

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package matching
import (
"context"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -441,15 +442,22 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() {
).Return(nil)

// Poll needs to happen before MustOffer, or else it goes into the non-blocking path.
var pollResultMu sync.Mutex
var polledTask *InternalTask
var pollErr error
wait := ensureAsyncReady(time.Second, func(ctx context.Context) {
task, err := t.matcher.Poll(ctx, "")
t.Nil(err)
t.NotNil(task)
pollResultMu.Lock()
defer pollResultMu.Unlock()
polledTask, pollErr = t.matcher.Poll(ctx, "")
})

t.NoError(t.matcher.MustOffer(ctx, task))
cancel()
wait()
pollResultMu.Lock()
defer pollResultMu.Unlock()
t.NoError(pollErr)
t.NotNil(polledTask)
t.NotNil(req)
t.NoError(err)
t.True(remoteSyncMatch)
Expand Down Expand Up @@ -499,15 +507,22 @@ func (t *MatcherTestSuite) TestIsolationMustOfferRemoteMatch() {
).Return(nil)

// Poll needs to happen before MustOffer, or else it goes into the non-blocking path.
var pollResultMu sync.Mutex
var polledTask *InternalTask
var pollErr error
wait := ensureAsyncReady(time.Second, func(ctx context.Context) {
task, err := t.matcher.Poll(ctx, "dca1")
t.Nil(err)
t.NotNil(task)
pollResultMu.Lock()
defer pollResultMu.Unlock()
polledTask, pollErr = t.matcher.Poll(ctx, "dca1")
})

t.NoError(t.matcher.MustOffer(ctx, task))
cancel()
wait()
pollResultMu.Lock()
defer pollResultMu.Unlock()
t.NoError(pollErr)
t.NotNil(polledTask)
t.NotNil(req)
t.NoError(err)
t.True(remoteSyncMatch)
Expand Down Expand Up @@ -627,6 +642,7 @@ func ensureAsyncAfterReady(ctxTimeout time.Duration, cb func(ctx context.Context

// Try to ensure a blocking callback is actively blocked in a goroutine before returning, so tests can
// ensure that the callback contents happen first.
// Do NOT access shared variables in the callback without mutex, as it may cause data races.
//
// This is a best-effort technique, as there is no way to reliably synchronize this kind of thing
// without exposing internal latches or having a more sophisticated locking library than Go offers.
Expand All @@ -635,8 +651,14 @@ func ensureAsyncAfterReady(ctxTimeout time.Duration, cb func(ctx context.Context
// Note that adding fmt.Println() calls touches synchronization code (for I/O), so it may change behavior.
func ensureAsyncReady(ctxTimeout time.Duration, cb func(ctx context.Context)) (wait func()) {
running := make(chan struct{})
closed := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
go func() {
// Defers are stacked. The last one added is the first one to run.
// We want to cancel the context which will make the callback return because it's a Poll,
// and then close the closed channel.
// This way the returned wait function will block until the callback has returned.
defer close(closed)
defer cancel()

close(running)
Expand All @@ -650,6 +672,6 @@ func ensureAsyncReady(ctxTimeout time.Duration, cb func(ctx context.Context)) (w
time.Sleep(1 * time.Millisecond)

return func() {
<-ctx.Done()
<-closed
}
}
Loading