Skip to content

Commit

Permalink
fix leaky goroutines in matching
Browse files Browse the repository at this point in the history
  • Loading branch information
3vilhamster committed Dec 19, 2023
1 parent e869984 commit 3741401
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
1 change: 1 addition & 0 deletions service/matching/taskListManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/dynamicconfig"
Expand Down
10 changes: 10 additions & 0 deletions service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"errors"
"runtime"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -80,6 +81,9 @@ type (
dispatchTask func(context.Context, *InternalTask) error
getIsolationGroupForTask func(context.Context, *persistence.TaskInfo) (string, error)
ratePerSecond func() float64

// stopWg is used to wait for all dispatchers to stop.
stopWg sync.WaitGroup
}
)

Expand Down Expand Up @@ -137,6 +141,7 @@ func (tr *taskReader) Stop() {
tag.Error(err))
}
tr.taskGC.RunNow(tr.taskAckManager.GetAckLevel())
tr.stopWg.Wait()
}
}

Expand All @@ -149,6 +154,8 @@ func (tr *taskReader) Signal() {
}

func (tr *taskReader) dispatchBufferedTasks(isolationGroup string) {
tr.stopWg.Add(1)
defer tr.stopWg.Done()
dispatchLoop:
for {
select {
Expand All @@ -168,6 +175,9 @@ dispatchLoop:
}

func (tr *taskReader) getTasksPump() {
tr.stopWg.Add(1)
defer tr.stopWg.Done()

updateAckTimer := time.NewTimer(tr.config.UpdateAckInterval())
defer updateAckTimer.Stop()
getTasksPumpLoop:
Expand Down

0 comments on commit 3741401

Please sign in to comment.