Skip to content

Commit

Permalink
Set triggerCondition when queryCoord reload loadbalanceTask meta (#15380
Browse files Browse the repository at this point in the history
)

Signed-off-by: xige-16 <[email protected]>
  • Loading branch information
xige-16 authored Jan 25, 2022
1 parent 8ab9d76 commit 6336e23
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/querycoord/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR
}

baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_LoadBalance)
req.BalanceReason = querypb.TriggerCondition_LoadBalance
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: req,
Expand Down
1 change: 1 addition & 0 deletions internal/querycoord/query_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func (qc *QueryCoord) watchNodeLoop() {
MsgType: commonpb.MsgType_LoadBalanceSegments,
SourceID: qc.session.ServerID,
},
BalanceReason: querypb.TriggerCondition_NodeDown,
SourceNodeIDs: offlineNodeIDs,
}

Expand Down
12 changes: 12 additions & 0 deletions internal/querycoord/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type task interface {
msgType() commonpb.MsgType
timestamp() Timestamp
getTriggerCondition() querypb.TriggerCondition
setTriggerCondition(trigger querypb.TriggerCondition)
preExecute(ctx context.Context) error
execute(ctx context.Context) error
postExecute(ctx context.Context) error
Expand Down Expand Up @@ -109,6 +110,7 @@ type baseTask struct {

taskID UniqueID
triggerCondition querypb.TriggerCondition
triggerMu sync.RWMutex
parentTask task
childTasks []task
childTasksMu sync.RWMutex
Expand Down Expand Up @@ -146,9 +148,19 @@ func (bt *baseTask) traceCtx() context.Context {
}

func (bt *baseTask) getTriggerCondition() querypb.TriggerCondition {
bt.triggerMu.RLock()
defer bt.triggerMu.RUnlock()

return bt.triggerCondition
}

func (bt *baseTask) setTriggerCondition(trigger querypb.TriggerCondition) {
bt.triggerMu.Lock()
defer bt.triggerMu.Unlock()

bt.triggerCondition = trigger
}

func (bt *baseTask) taskPriority() querypb.TriggerCondition {
return bt.triggerCondition
}
Expand Down
3 changes: 3 additions & 0 deletions internal/querycoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
if err != nil {
return nil, err
}
// if triggerCondition == nodeDown, and the queryNode resources are insufficient,
// queryCoord will waits until queryNode can load the data, ensuring that the data is not lost
baseTask.setTriggerCondition(loadReq.BalanceReason)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: &loadReq,
Expand Down

0 comments on commit 6336e23

Please sign in to comment.