Skip to content

Commit

Permalink
Update matching to emit metric for tasklist backlog size
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Nov 14, 2023
1 parent ac884c3 commit d705b15
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 3 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2404,6 +2404,7 @@ const (
TaskListManagersGauge
TaskLagPerTaskListGauge
TaskBacklogPerTaskListGauge
TaskCountPerTaskListGauge

NumMatchingMetrics
)
Expand Down Expand Up @@ -3021,6 +3022,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
24 changes: 24 additions & 0 deletions service/matching/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type (
taskListKind int
taskType int
rangeID int64
backlogCount int64
store persistence.TaskManager
logger log.Logger
}
Expand Down Expand Up @@ -77,6 +78,13 @@ func (db *taskListDB) RangeID() int64 {
return db.rangeID
}

// BacklogCount returns the current backlog size
func (db *taskListDB) BacklogCount() int64 {
db.Lock()
defer db.Unlock()
return db.backlogCount
}

// RenewLease renews the lease on a tasklist. If there is no previous lease,
// this method will attempt to steal tasklist from current owner
func (db *taskListDB) RenewLease() (taskListState, error) {
Expand Down Expand Up @@ -189,3 +197,19 @@ func (db *taskListDB) CompleteTasksLessThan(taskID int64, limit int) (int, error
}
return resp.TasksCompleted, nil
}

// GetTaskListSize gets the backlog size of a tasklist
func (db *taskListDB) GetTaskListSize(ackLevel int64) (int64, error) {
resp, err := db.store.GetTaskListSize(context.Background(), &persistence.GetTaskListSizeRequest{
DomainID: db.domainID,
DomainName: db.domainName,
TaskListName: db.taskListName,
TaskListType: db.taskType,
AckLevel: ackLevel,
})
if err != nil {
return 0, err
}
db.backlogCount = resp.Size
return resp.Size, nil
}
14 changes: 13 additions & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,19 @@ func (m *testTaskManager) GetTasks(
}

func (m *testTaskManager) GetTaskListSize(_ context.Context, request *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error) {
return nil, fmt.Errorf("Not implemented")
tlm := m.getTaskListManager(newTestTaskListID(request.DomainID, request.TaskListName, request.TaskListType))
tlm.Lock()
defer tlm.Unlock()
count := int64(0)
it := tlm.tasks.Iterator()
for it.Next() {
taskID := it.Key().(int64)
if taskID <= request.AckLevel {
continue
}
count++
}
return &persistence.GetTaskListSizeResponse{Size: count}, nil
}

func (m *testTaskManager) GetOrphanTasks(_ context.Context, request *persistence.GetOrphanTasksRequest) (*persistence.GetOrphanTasksResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *type
response.TaskListStatus = &types.TaskListStatus{
ReadLevel: c.taskAckManager.GetReadLevel(),
AckLevel: c.taskAckManager.GetAckLevel(),
BacklogCountHint: c.taskAckManager.GetBacklogCount(),
BacklogCountHint: c.db.BacklogCount(),
RatePerSecond: c.matcher.Rate(),
TaskIDBlock: &types.TaskIDBlock{
StartID: taskIDBlock.start,
Expand Down
2 changes: 1 addition & 1 deletion service/matching/taskListManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestDescribeTaskList(t *testing.T) {
require.NotNil(t, taskListStatus)
require.Zero(t, taskListStatus.GetAckLevel())
require.Equal(t, taskCount, taskListStatus.GetReadLevel())
require.Equal(t, taskCount, taskListStatus.GetBacklogCountHint())
require.Equal(t, int64(0), taskListStatus.GetBacklogCountHint())
require.True(t, taskListStatus.GetRatePerSecond() > (_defaultTaskDispatchRPS-1))
require.True(t, taskListStatus.GetRatePerSecond() < (_defaultTaskDispatchRPS+1))
taskIDBlock := taskListStatus.GetTaskIDBlock()
Expand Down
5 changes: 5 additions & 0 deletions service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ getTasksPumpLoop:
}
case <-updateAckTimer.C:
{
ackLevel := tr.taskAckManager.GetAckLevel()
if size, err := tr.db.GetTaskListSize(ackLevel); err == nil {
tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType)).
UpdateGauge(metrics.TaskCountPerTaskListGauge, float64(size))
}
if err := tr.handleErr(tr.persistAckLevel()); err != nil {
tr.logger.Error("Persistent store operation failure",
tag.StoreOperationUpdateTaskList,
Expand Down

0 comments on commit d705b15

Please sign in to comment.