Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update matching to emit metric for tasklist backlog size #5448

Merged
merged 2 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2404,6 +2404,7 @@ const (
TaskListManagersGauge
TaskLagPerTaskListGauge
TaskBacklogPerTaskListGauge
TaskCountPerTaskListGauge

NumMatchingMetrics
)
Expand Down Expand Up @@ -3021,6 +3022,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
22 changes: 22 additions & 0 deletions service/matching/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type (
taskListKind int
taskType int
rangeID int64
backlogCount int64
store persistence.TaskManager
logger log.Logger
}
Expand Down Expand Up @@ -77,6 +78,11 @@ func (db *taskListDB) RangeID() int64 {
return db.rangeID
}

// BacklogCount returns the current backlog size
func (db *taskListDB) BacklogCount() int64 {
return atomic.LoadInt64(&db.backlogCount)
}

// RenewLease renews the lease on a tasklist. If there is no previous lease,
// this method will attempt to steal tasklist from current owner
func (db *taskListDB) RenewLease() (taskListState, error) {
Expand Down Expand Up @@ -189,3 +195,19 @@ func (db *taskListDB) CompleteTasksLessThan(taskID int64, limit int) (int, error
}
return resp.TasksCompleted, nil
}

// GetTaskListSize gets the backlog size of a tasklist
func (db *taskListDB) GetTaskListSize(ackLevel int64) (int64, error) {
resp, err := db.store.GetTaskListSize(context.Background(), &persistence.GetTaskListSizeRequest{
DomainID: db.domainID,
DomainName: db.domainName,
TaskListName: db.taskListName,
TaskListType: db.taskType,
AckLevel: ackLevel,
})
if err != nil {
return 0, err
}
atomic.StoreInt64(&db.backlogCount, resp.Size)
return resp.Size, nil
}
14 changes: 13 additions & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,19 @@ func (m *testTaskManager) GetTasks(
}

func (m *testTaskManager) GetTaskListSize(_ context.Context, request *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error) {
return nil, fmt.Errorf("Not implemented")
tlm := m.getTaskListManager(newTestTaskListID(request.DomainID, request.TaskListName, request.TaskListType))
tlm.Lock()
defer tlm.Unlock()
count := int64(0)
it := tlm.tasks.Iterator()
for it.Next() {
taskID := it.Key().(int64)
if taskID <= request.AckLevel {
continue
}
count++
}
return &persistence.GetTaskListSizeResponse{Size: count}, nil
}

func (m *testTaskManager) GetOrphanTasks(_ context.Context, request *persistence.GetOrphanTasksRequest) (*persistence.GetOrphanTasksResponse, error) {
Expand Down
7 changes: 6 additions & 1 deletion service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,15 @@ func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *type
}

taskIDBlock := rangeIDToTaskIDBlock(c.db.RangeID(), c.config.RangeSize)
backlogCount, err := c.db.GetTaskListSize(c.taskAckManager.GetAckLevel())
if err != nil {
// fallback to im-memory backlog, if failed to get count from db
backlogCount = c.taskAckManager.GetBacklogCount()
}
response.TaskListStatus = &types.TaskListStatus{
ReadLevel: c.taskAckManager.GetReadLevel(),
AckLevel: c.taskAckManager.GetAckLevel(),
BacklogCountHint: c.taskAckManager.GetBacklogCount(),
BacklogCountHint: backlogCount,
RatePerSecond: c.matcher.Rate(),
TaskIDBlock: &types.TaskIDBlock{
StartID: taskIDBlock.start,
Expand Down
2 changes: 1 addition & 1 deletion service/matching/taskListManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestDescribeTaskList(t *testing.T) {
require.NotNil(t, taskListStatus)
require.Zero(t, taskListStatus.GetAckLevel())
require.Equal(t, taskCount, taskListStatus.GetReadLevel())
require.Equal(t, taskCount, taskListStatus.GetBacklogCountHint())
require.Equal(t, int64(0), taskListStatus.GetBacklogCountHint())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand. In the test 3 tasks are pushed to the tasklist. Why a backlog is still 0? Or are they immediately processed?
I suggest adding a test that verifies the backlog count/metric emition with tally.TestScope.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original value is in-memory backlog, and the new value is the in-db backlog. The test generates the backlog by changing the in-memory backlog, so after the change, the value is changed.

require.True(t, taskListStatus.GetRatePerSecond() > (_defaultTaskDispatchRPS-1))
require.True(t, taskListStatus.GetRatePerSecond() < (_defaultTaskDispatchRPS+1))
taskIDBlock := taskListStatus.GetTaskIDBlock()
Expand Down
5 changes: 5 additions & 0 deletions service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ getTasksPumpLoop:
}
case <-updateAckTimer.C:
{
ackLevel := tr.taskAckManager.GetAckLevel()
if size, err := tr.db.GetTaskListSize(ackLevel); err == nil {
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved
tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType)).
UpdateGauge(metrics.TaskCountPerTaskListGauge, float64(size))
}
if err := tr.handleErr(tr.persistAckLevel()); err != nil {
tr.logger.Error("Persistent store operation failure",
tag.StoreOperationUpdateTaskList,
Expand Down