From d705b15cb81cd3e3bfb7b712c2443412fd7d934a Mon Sep 17 00:00:00 2001 From: Zijian Date: Tue, 14 Nov 2023 16:28:10 +0800 Subject: [PATCH] Update matching to emit metric for tasklist backlog size --- common/metrics/defs.go | 2 ++ service/matching/db.go | 24 ++++++++++++++++++++++++ service/matching/matchingEngine_test.go | 14 +++++++++++++- service/matching/taskListManager.go | 2 +- service/matching/taskListManager_test.go | 2 +- service/matching/taskReader.go | 5 +++++ 6 files changed, 46 insertions(+), 3 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 45513ceb6d4..9a1ead49d3b 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -2404,6 +2404,7 @@ const ( TaskListManagersGauge TaskLagPerTaskListGauge TaskBacklogPerTaskListGauge + TaskCountPerTaskListGauge NumMatchingMetrics ) @@ -3021,6 +3022,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge}, TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge}, TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge}, + TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge}, }, Worker: { ReplicatorMessages: {metricName: "replicator_messages"}, diff --git a/service/matching/db.go b/service/matching/db.go index 3c274c5f9d8..6df28aa26bf 100644 --- a/service/matching/db.go +++ b/service/matching/db.go @@ -39,6 +39,7 @@ type ( taskListKind int taskType int rangeID int64 + backlogCount int64 store persistence.TaskManager logger log.Logger } @@ -77,6 +78,13 @@ func (db *taskListDB) RangeID() int64 { return db.rangeID } +// BacklogCount returns the current backlog size +func (db *taskListDB) BacklogCount() int64 { + db.Lock() + defer db.Unlock() + return db.backlogCount +} + // RenewLease renews the lease on a tasklist. If there is no previous lease, // this method will attempt to steal tasklist from current owner func (db *taskListDB) RenewLease() (taskListState, error) { @@ -189,3 +197,19 @@ func (db *taskListDB) CompleteTasksLessThan(taskID int64, limit int) (int, error } return resp.TasksCompleted, nil } + +// GetTaskListSize gets the backlog size of a tasklist +func (db *taskListDB) GetTaskListSize(ackLevel int64) (int64, error) { + resp, err := db.store.GetTaskListSize(context.Background(), &persistence.GetTaskListSizeRequest{ + DomainID: db.domainID, + DomainName: db.domainName, + TaskListName: db.taskListName, + TaskListType: db.taskType, + AckLevel: ackLevel, + }) + if err != nil { + return 0, err + } + db.backlogCount = resp.Size + return resp.Size, nil +} diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index c36e56c5c0f..91bda71c6ab 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -1828,7 +1828,19 @@ func (m *testTaskManager) GetTasks( } func (m *testTaskManager) GetTaskListSize(_ context.Context, request *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error) { - return nil, fmt.Errorf("Not implemented") + tlm := m.getTaskListManager(newTestTaskListID(request.DomainID, request.TaskListName, request.TaskListType)) + tlm.Lock() + defer tlm.Unlock() + count := int64(0) + it := tlm.tasks.Iterator() + for it.Next() { + taskID := it.Key().(int64) + if taskID <= request.AckLevel { + continue + } + count++ + } + return &persistence.GetTaskListSizeResponse{Size: count}, nil } func (m *testTaskManager) GetOrphanTasks(_ context.Context, request *persistence.GetOrphanTasksRequest) (*persistence.GetOrphanTasksResponse, error) { diff --git a/service/matching/taskListManager.go b/service/matching/taskListManager.go index 25834a09af0..ff47dcd353b 100644 --- a/service/matching/taskListManager.go +++ b/service/matching/taskListManager.go @@ -459,7 +459,7 @@ func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *type response.TaskListStatus = &types.TaskListStatus{ ReadLevel: c.taskAckManager.GetReadLevel(), AckLevel: c.taskAckManager.GetAckLevel(), - BacklogCountHint: c.taskAckManager.GetBacklogCount(), + BacklogCountHint: c.db.BacklogCount(), RatePerSecond: c.matcher.Rate(), TaskIDBlock: &types.TaskIDBlock{ StartID: taskIDBlock.start, diff --git a/service/matching/taskListManager_test.go b/service/matching/taskListManager_test.go index a6522499508..4f1a6158f7f 100644 --- a/service/matching/taskListManager_test.go +++ b/service/matching/taskListManager_test.go @@ -191,7 +191,7 @@ func TestDescribeTaskList(t *testing.T) { require.NotNil(t, taskListStatus) require.Zero(t, taskListStatus.GetAckLevel()) require.Equal(t, taskCount, taskListStatus.GetReadLevel()) - require.Equal(t, taskCount, taskListStatus.GetBacklogCountHint()) + require.Equal(t, int64(0), taskListStatus.GetBacklogCountHint()) require.True(t, taskListStatus.GetRatePerSecond() > (_defaultTaskDispatchRPS-1)) require.True(t, taskListStatus.GetRatePerSecond() < (_defaultTaskDispatchRPS+1)) taskIDBlock := taskListStatus.GetTaskIDBlock() diff --git a/service/matching/taskReader.go b/service/matching/taskReader.go index f6a5b199732..256d0bc51e0 100644 --- a/service/matching/taskReader.go +++ b/service/matching/taskReader.go @@ -200,6 +200,11 @@ getTasksPumpLoop: } case <-updateAckTimer.C: { + ackLevel := tr.taskAckManager.GetAckLevel() + if size, err := tr.db.GetTaskListSize(ackLevel); err == nil { + tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType)). + UpdateGauge(metrics.TaskCountPerTaskListGauge, float64(size)) + } if err := tr.handleErr(tr.persistAckLevel()); err != nil { tr.logger.Error("Persistent store operation failure", tag.StoreOperationUpdateTaskList,