-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix leaky goroutines in matching #5499
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import ( | |
"context" | ||
"errors" | ||
"runtime" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
|
@@ -80,6 +81,9 @@ type ( | |
dispatchTask func(context.Context, *InternalTask) error | ||
getIsolationGroupForTask func(context.Context, *persistence.TaskInfo) (string, error) | ||
ratePerSecond func() float64 | ||
|
||
// stopWg is used to wait for all dispatchers to stop. | ||
stopWg sync.WaitGroup | ||
} | ||
) | ||
|
||
|
@@ -137,6 +141,7 @@ func (tr *taskReader) Stop() { | |
tag.Error(err)) | ||
} | ||
tr.taskGC.RunNow(tr.taskAckManager.GetAckLevel()) | ||
tr.stopWg.Wait() | ||
} | ||
} | ||
|
||
|
@@ -149,6 +154,8 @@ func (tr *taskReader) Signal() { | |
} | ||
|
||
func (tr *taskReader) dispatchBufferedTasks(isolationGroup string) { | ||
tr.stopWg.Add(1) | ||
defer tr.stopWg.Done() | ||
Comment on lines
+157
to
+158
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like a generic approach for stopping. I wonder if it's possible to have some OOP around this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OOP in Go is hard, I'm not aware of generic solutions on this. |
||
dispatchLoop: | ||
for { | ||
select { | ||
|
@@ -168,6 +175,9 @@ dispatchLoop: | |
} | ||
|
||
func (tr *taskReader) getTasksPump() { | ||
tr.stopWg.Add(1) | ||
defer tr.stopWg.Done() | ||
|
||
updateAckTimer := time.NewTimer(tr.config.UpdateAckInterval()) | ||
defer updateAckTimer.Stop() | ||
getTasksPumpLoop: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is most probably because of Go 1.21 != CI (1.20)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets address this after @Groxx PR. Then everything will be unified.