-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor flush manager injection to reduce goroutine number #15180
Refactor flush manager injection to reduce goroutine number #15180
Conversation
Signed-off-by: Congqi Xia <[email protected]>
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: congqixia The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@congqixia ut workflow job failed, comment |
@congqixia E2e jenkins job failed, comment |
Codecov Report
@@ Coverage Diff @@
## master #15180 +/- ##
==========================================
- Coverage 80.12% 80.11% -0.02%
==========================================
Files 443 443
Lines 60807 60791 -16
==========================================
- Hits 48720 48701 -19
- Misses 9783 9786 +3
Partials 2304 2304
|
@@ -175,9 +174,31 @@ func (q *orderFlushQueue) enqueueDelFlush(task flushDeleteTask, deltaLogs *DelDa | |||
// send into injectCh in there is running task | |||
// or perform injection logic here if there is no injection | |||
func (q *orderFlushQueue) inject(inject *taskInjection) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little bit confused about taskInjection, maybe all we need is a barrier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The injector(compactor) start inject and find all running flush tasks, and insert a wait mark for each flusher
Compactor goroutine finish compaction, change all the target id of the affected flusher, and notify all the flusher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postInjection postInjectionFunc var is not actually necessary and it shouldn't be a part of orderFlushQueue
@@ -441,7 +455,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique | |||
func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) { | |||
go injection.waitForInjected() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering do we need to use such complicated way for synchronization
func (q *orderFlushQueue) handleInject(inject *taskInjection) { | ||
// notify one injection done | ||
inject.injectOne() | ||
ok := <-inject.injectOver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
injectOver can be replaced by a barrier, if we just want to ensure all flush task reached end
/lgtm |
Resolves #15179
/kind enhancement
Signed-off-by: Congqi Xia [email protected]