Skip to content

Commit

Permalink
Unsubscribe channel after query node down (#15230)
Browse files Browse the repository at this point in the history
Signed-off-by: xige-16 <[email protected]>
  • Loading branch information
xige-16 authored Jan 17, 2022
1 parent 7b48dc7 commit dfc6670
Show file tree
Hide file tree
Showing 17 changed files with 832 additions and 198 deletions.
10 changes: 10 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,16 @@ message CollectionInfo {
int64 inMemory_percentage = 7;
}

message UnsubscribeChannels {
int64 collectionID = 1;
repeated string channels = 2;
}

message UnsubscribeChannelInfo {
int64 nodeID = 1;
repeated UnsubscribeChannels collection_channels = 2;
}

//---- synchronize messages proto between QueryCoord and QueryNode -----
message SegmentChangeInfo {
int64 online_nodeID = 1;
Expand Down
392 changes: 246 additions & 146 deletions internal/proto/querypb/query_coord.pb.go

Large diffs are not rendered by default.

174 changes: 174 additions & 0 deletions internal/querycoord/channel_unsubscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package querycoord

import (
"container/list"
"context"
"fmt"
"sync"
"time"

"github.com/golang/protobuf/proto"
"go.uber.org/zap"

etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
)

const (
unsubscribeChannelInfoPrefix = "queryCoord-unsubscribeChannelInfo"
unsubscribeChannelCheckInterval = time.Second
)

type channelUnsubscribeHandler struct {
ctx context.Context
cancel context.CancelFunc
kvClient *etcdkv.EtcdKV
factory msgstream.Factory

channelInfos *list.List
downNodeChan chan int64

wg sync.WaitGroup
}

// newChannelUnsubscribeHandler create a new handler service to unsubscribe channels
func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factory msgstream.Factory) (*channelUnsubscribeHandler, error) {
childCtx, cancel := context.WithCancel(ctx)
handler := &channelUnsubscribeHandler{
ctx: childCtx,
cancel: cancel,
kvClient: kv,
factory: factory,

channelInfos: list.New(),
//TODO:: if the query nodes that are down exceed 1024, query coord will not be able to restart
downNodeChan: make(chan int64, 1024),
}

err := handler.reloadFromKV()
if err != nil {
return nil, err
}

return handler, nil
}

// reloadFromKV reload unsolved channels to unsubscribe
func (csh *channelUnsubscribeHandler) reloadFromKV() error {
log.Debug("start reload unsubscribe channelInfo from kv")
_, channelInfoValues, err := csh.kvClient.LoadWithPrefix(unsubscribeChannelInfoPrefix)
if err != nil {
return err
}
for _, value := range channelInfoValues {
channelInfo := &querypb.UnsubscribeChannelInfo{}
err = proto.Unmarshal([]byte(value), channelInfo)
if err != nil {
return err
}
csh.channelInfos.PushBack(channelInfo)
csh.downNodeChan <- channelInfo.NodeID
}

return nil
}

// addUnsubscribeChannelInfo add channel info to handler service, and persistent to etcd
func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.UnsubscribeChannelInfo) {
nodeID := info.NodeID
channelInfoValue, err := proto.Marshal(info)
if err != nil {
panic(err)
}
// when queryCoord is restarted multiple times, the nodeID of added channelInfo may be the same
hasEnqueue := false
for e := csh.channelInfos.Back(); e != nil; e = e.Prev() {
if e.Value.(*querypb.UnsubscribeChannelInfo).NodeID == nodeID {
hasEnqueue = true
}
}

if !hasEnqueue {
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err = csh.kvClient.Save(channelInfoKey, string(channelInfoValue))
if err != nil {
panic(err)
}
csh.channelInfos.PushBack(info)
csh.downNodeChan <- info.NodeID
log.Debug("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID))
}
}

// handleChannelUnsubscribeLoop handle the unsubscription of channels which query node has watched
func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
defer csh.wg.Done()
for {
select {
case <-csh.ctx.Done():
log.Debug("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end")
return
case <-csh.downNodeChan:
channelInfo := csh.channelInfos.Front().Value.(*querypb.UnsubscribeChannelInfo)
nodeID := channelInfo.NodeID
for _, collectionChannels := range channelInfo.CollectionChannels {
collectionID := collectionChannels.CollectionID
subName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, nodeID)
err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
if err != nil {
log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID))
panic(err)
}
}

channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, nodeID)
err := csh.kvClient.Remove(channelInfoKey)
if err != nil {
log.Error("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID))
panic(err)
}
log.Debug("unsubscribe channels success", zap.Int64("nodeID", nodeID))
}
}
}

func (csh *channelUnsubscribeHandler) start() {
csh.wg.Add(1)
go csh.handleChannelUnsubscribeLoop()
}

func (csh *channelUnsubscribeHandler) close() {
csh.cancel()
csh.wg.Wait()
}

// unsubscribeChannels create consumer fist, and unsubscribe channel through msgStream.close()
func unsubscribeChannels(ctx context.Context, factory msgstream.Factory, subName string, channels []string) error {
msgStream, err := factory.NewMsgStream(ctx)
if err != nil {
return err
}

msgStream.AsConsumer(channels, subName)
msgStream.Close()
return nil
}
131 changes: 131 additions & 0 deletions internal/querycoord/channel_unsubscribe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package querycoord

import (
"context"
"fmt"
"testing"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"

etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/etcd"
)

func Test_HandlerReloadFromKV(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)

channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, defaultQueryNodeID)
unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{
NodeID: defaultQueryNodeID,
}
channelInfoBytes, err := proto.Marshal(unsubscribeChannelInfo)
assert.Nil(t, err)

err = kv.Save(channelInfoKey, string(channelInfoBytes))
assert.Nil(t, err)

factory := msgstream.NewPmsFactory()
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)
assert.Equal(t, 1, len(handler.downNodeChan))

cancel()
}

func Test_AddUnsubscribeChannelInfo(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
factory := msgstream.NewPmsFactory()
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)

collectionChannels := &querypb.UnsubscribeChannels{
CollectionID: defaultCollectionID,
Channels: []string{"test-channel"},
}
unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{
NodeID: defaultQueryNodeID,
CollectionChannels: []*querypb.UnsubscribeChannels{collectionChannels},
}

handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
frontValue := handler.channelInfos.Front()
assert.NotNil(t, frontValue)
assert.Equal(t, defaultQueryNodeID, frontValue.Value.(*querypb.UnsubscribeChannelInfo).NodeID)

// repeat nodeID which has down
handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
assert.Equal(t, 1, len(handler.downNodeChan))

cancel()
}

func Test_HandleChannelUnsubscribeLoop(t *testing.T) {
refreshParams()
baseCtx, cancel := context.WithCancel(context.Background())
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
defer etcdCli.Close()
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"PulsarAddress": Params.PulsarCfg.Address,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
factory.SetParams(m)
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)

collectionChannels := &querypb.UnsubscribeChannels{
CollectionID: defaultCollectionID,
Channels: []string{"test-channel"},
}
unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{
NodeID: defaultQueryNodeID,
CollectionChannels: []*querypb.UnsubscribeChannels{collectionChannels},
}

handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, defaultQueryNodeID)
_, err = kv.Load(channelInfoKey)
assert.Nil(t, err)

handler.start()

for {
_, err = kv.Load(channelInfoKey)
if err != nil {
break
}
}

cancel()
}
27 changes: 25 additions & 2 deletions internal/querycoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,15 @@ type queryNodeCluster struct {

sync.RWMutex
clusterMeta Meta
handler *channelUnsubscribeHandler
nodes map[int64]Node
newNodeFn newQueryNodeFn
segmentAllocator SegmentAllocatePolicy
channelAllocator ChannelAllocatePolicy
segSizeEstimator func(request *querypb.LoadSegmentsRequest, dataKV kv.DataKV) (int64, error)
}

func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session) (Cluster, error) {
func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session, handler *channelUnsubscribeHandler) (Cluster, error) {
childCtx, cancel := context.WithCancel(ctx)
nodes := make(map[int64]Node)
c := &queryNodeCluster{
Expand All @@ -119,6 +120,7 @@ func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdK
client: kv,
session: session,
clusterMeta: clusterMeta,
handler: handler,
nodes: nodes,
newNodeFn: newNodeFn,
segmentAllocator: defaultSegAllocatePolicy(),
Expand Down Expand Up @@ -546,6 +548,26 @@ func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetri
return ret
}

// setNodeState update queryNode state, which may be offline, disconnect, online
// when queryCoord restart, it will call setNodeState via the registerNode function
// when the new queryNode starts, queryCoord calls setNodeState via the registerNode function
// when the new queryNode down, queryCoord calls setNodeState via the stopNode function
func (c *queryNodeCluster) setNodeState(nodeID int64, node Node, state nodeState) {
// if query node down, should unsubscribe all channel the node has watched
// if not unsubscribe channel, may result in pulsar having too many backlogs
if state == offline {
// 1. find all the search/dmChannel/deltaChannel the node has watched
unsubscribeChannelInfo := c.clusterMeta.getWatchedChannelsByNodeID(nodeID)

// 2.add unsubscribed channels to handler, handler will auto unsubscribe channel
if len(unsubscribeChannelInfo.CollectionChannels) != 0 {
c.handler.addUnsubscribeChannelInfo(unsubscribeChannelInfo)
}
}

node.setState(state)
}

func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error {
c.Lock()
defer c.Unlock()
Expand All @@ -566,7 +588,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
log.Debug("registerNode: create a new QueryNode failed", zap.Int64("nodeID", id), zap.Error(err))
return err
}
node.setState(state)
c.setNodeState(id, node, state)
if state < online {
go node.start()
}
Expand Down Expand Up @@ -614,6 +636,7 @@ func (c *queryNodeCluster) stopNode(nodeID int64) {

if node, ok := c.nodes[nodeID]; ok {
node.stop()
c.setNodeState(nodeID, node, offline)
log.Debug("stopNode: queryNode offline", zap.Int64("nodeID", nodeID))
}
}
Expand Down
Loading

0 comments on commit dfc6670

Please sign in to comment.