Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for matcher #6207

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2544,13 +2544,18 @@ const (
TaskLagPerTaskListGauge
TaskBacklogPerTaskListGauge
TaskCountPerTaskListGauge
SyncMatchLocalPollLatencyPerTaskList
SyncMatchForwardPollLatencyPerTaskList
AsyncMatchLocalPollCounterPerTaskList
AsyncMatchLocalPollLatencyPerTaskList
AsyncMatchForwardPollCounterPerTaskList
AsyncMatchForwardPollLatencyPerTaskList
AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList
AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList
AsyncMatchAttemptPerTaskList
PollLocalMatchLatencyPerTaskList
PollForwardMatchLatencyPerTaskList
PollLocalMatchAfterForwardFailedLatencyPerTaskList

NumMatchingMetrics
)
Expand Down Expand Up @@ -3206,13 +3211,18 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge},
SyncMatchLocalPollLatencyPerTaskList: {metricName: "syncmatch_local_poll_latency_per_tl", metricRollupName: "syncmatch_local_poll_latency"},
SyncMatchForwardPollLatencyPerTaskList: {metricName: "syncmatch_forward_poll_latency_per_tl", metricRollupName: "syncmatch_forward_poll_latency"},
AsyncMatchLocalPollCounterPerTaskList: {metricName: "asyncmatch_local_poll_per_tl", metricRollupName: "asyncmatch_local_poll"},
AsyncMatchLocalPollLatencyPerTaskList: {metricName: "asyncmatch_local_poll_latency_per_tl", metricRollupName: "asyncmatch_local_poll_latency"},
AsyncMatchForwardPollCounterPerTaskList: {metricName: "asyncmatch_forward_poll_per_tl", metricRollupName: "asyncmatch_forward_poll"},
AsyncMatchForwardPollLatencyPerTaskList: {metricName: "asyncmatch_forward_poll_latency_per_tl", metricRollupName: "asyncmatch_forward_poll_latency"},
AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed"},
AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_latency_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed_latency"},
AsyncMatchAttemptPerTaskList: {metricName: "asyncmatch_attempt_per_tl", metricRollupName: "asyncmatch_attempt"},
PollLocalMatchLatencyPerTaskList: {metricName: "poll_local_match_latency_per_tl", metricRollupName: "poll_local_match_latency", metricType: Timer},
PollForwardMatchLatencyPerTaskList: {metricName: "poll_forward_match_latency_per_tl", metricRollupName: "poll_forward_match_latency", metricType: Timer},
PollLocalMatchAfterForwardFailedLatencyPerTaskList: {metricName: "poll_local_match_after_forward_failed_latency_per_tl", metricRollupName: "poll_local_match_after_forward_failed_latency", metricType: Timer},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
25 changes: 20 additions & 5 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
// - context deadline is exceeded
// - task is matched and consumer returns error in response channel
func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, error) {
startT := time.Now()
if !task.IsForwarded() {
err := tm.ratelimit(ctx)
if err != nil {
Expand All @@ -145,6 +146,7 @@
// if there is a response channel, block until resp is received
// and return error if the response contains error
err := <-task.ResponseC
tm.scope.RecordTimer(metrics.SyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
return true, err
}
return false, nil
Expand All @@ -163,6 +165,7 @@
token.release("")
if err == nil {
// task was remotely sync matched on the parent partition
tm.scope.RecordTimer(metrics.SyncMatchForwardPollLatencyPerTaskList, time.Since(startT))
return true, nil
}
if errors.Is(err, ErrForwarderSlowDown) {
Expand All @@ -174,20 +177,21 @@
task.IsForwarded() { // task came from a child partition
// a forwarded backlog task from a child partition, block trying
// to match with a poller until ctx timeout
return tm.offerOrTimeout(ctx, task)
return tm.offerOrTimeout(ctx, startT, task)
}
}

return false, nil
}
}

func (tm *TaskMatcher) offerOrTimeout(ctx context.Context, task *InternalTask) (bool, error) {
func (tm *TaskMatcher) offerOrTimeout(ctx context.Context, startT time.Time, task *InternalTask) (bool, error) {
select {
case tm.getTaskC(task) <- task: // poller picked up the task
if task.ResponseC != nil {
select {
case err := <-task.ResponseC:
tm.scope.RecordTimer(metrics.SyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
return true, err
case <-ctx.Done():
return false, nil
Expand Down Expand Up @@ -345,6 +349,7 @@
// On success, the returned task could be a query task or a regular task
// Returns ErrNoTasks when context deadline is exceeded
func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*InternalTask, error) {
startT := time.Now()
isolatedTaskC, ok := tm.isolatedTaskC[isolationGroup]
if !ok && isolationGroup != "" {
// fallback to default isolation group instead of making poller crash if the isolation group is invalid
Expand All @@ -353,6 +358,7 @@
}
// try local match first without blocking until context timeout
if task, err := tm.pollNonBlocking(ctx, isolatedTaskC, tm.taskC, tm.queryTaskC); err == nil {
tm.scope.RecordTimer(metrics.PollLocalMatchLatencyPerTaskList, time.Since(startT))
return task, nil
}
// there is no local poller available to pickup this task. Now block waiting
Expand All @@ -369,20 +375,22 @@
TaskListKind: tm.tasklistKind.Ptr(),
EventName: "Matcher Falling Back to Non-Local Polling",
})
return tm.pollOrForward(ctx, isolationGroup, isolatedTaskC, tm.taskC, tm.queryTaskC)
return tm.pollOrForward(ctx, startT, isolationGroup, isolatedTaskC, tm.taskC, tm.queryTaskC)
}

// PollForQuery blocks until a *query* task is found or context deadline is exceeded
// Returns ErrNoTasks when context deadline is exceeded
func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*InternalTask, error) {
startT := time.Now()
// try local match first without blocking until context timeout
if task, err := tm.pollNonBlocking(ctx, nil, nil, tm.queryTaskC); err == nil {
tm.scope.RecordTimer(metrics.PollLocalMatchLatencyPerTaskList, time.Since(startT))
return task, nil
}
// there is no local poller available to pickup this task. Now block waiting
// either for a local poller or a forwarding token to be available. When a
// forwarding token becomes available, send this poll to a parent partition
return tm.pollOrForward(ctx, "", nil, nil, tm.queryTaskC)
return tm.pollOrForward(ctx, startT, "", nil, nil, tm.queryTaskC)
}

// UpdateRatelimit updates the task dispatch rate
Expand All @@ -406,6 +414,7 @@

func (tm *TaskMatcher) pollOrForward(
ctx context.Context,
startT time.Time,
isolationGroup string,
isolatedTaskC <-chan *InternalTask,
taskC <-chan *InternalTask,
Expand All @@ -416,6 +425,7 @@
if task.ResponseC != nil {
tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
}
tm.scope.RecordTimer(metrics.PollLocalMatchLatencyPerTaskList, time.Since(startT))
tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
event.Log(event.E{
TaskListName: tm.tasklist.GetName(),
Expand All @@ -435,6 +445,7 @@
if task.ResponseC != nil {
tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
}
tm.scope.RecordTimer(metrics.PollLocalMatchLatencyPerTaskList, time.Since(startT))
tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
event.Log(event.E{
TaskListName: tm.tasklist.GetName(),
Expand Down Expand Up @@ -472,6 +483,7 @@
})
if task, err := tm.fwdr.ForwardPoll(ctx); err == nil {
token.release(isolationGroup)
tm.scope.RecordTimer(metrics.PollForwardMatchLatencyPerTaskList, time.Since(startT))
event.Log(event.E{
TaskListName: tm.tasklist.GetName(),
TaskListType: tm.tasklist.GetType(),
Expand All @@ -481,12 +493,13 @@
return task, nil
}
token.release(isolationGroup)
return tm.poll(ctx, isolatedTaskC, taskC, queryTaskC)
return tm.poll(ctx, startT, isolatedTaskC, taskC, queryTaskC)

Check warning on line 496 in service/matching/tasklist/matcher.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/matcher.go#L496

Added line #L496 was not covered by tests
}
}

func (tm *TaskMatcher) poll(
ctx context.Context,
startT time.Time,
isolatedTaskC <-chan *InternalTask,
taskC <-chan *InternalTask,
queryTaskC <-chan *InternalTask,
Expand All @@ -496,6 +509,7 @@
if task.ResponseC != nil {
tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
}
tm.scope.RecordTimer(metrics.PollLocalMatchAfterForwardFailedLatencyPerTaskList, time.Since(startT))

Check warning on line 512 in service/matching/tasklist/matcher.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/matcher.go#L512

Added line #L512 was not covered by tests
tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
event.Log(event.E{
TaskListName: tm.tasklist.GetName(),
Expand All @@ -515,6 +529,7 @@
if task.ResponseC != nil {
tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
}
tm.scope.RecordTimer(metrics.PollLocalMatchAfterForwardFailedLatencyPerTaskList, time.Since(startT))

Check warning on line 532 in service/matching/tasklist/matcher.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/matcher.go#L532

Added line #L532 was not covered by tests
tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
event.Log(event.E{
TaskListName: tm.tasklist.GetName(),
Expand Down
1 change: 1 addition & 0 deletions service/matching/tasklist/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (t *MatcherTestSuite) TestSyncMatchFailure() {
func (t *MatcherTestSuite) TestRateLimitHandling() {
scope := mocks.Scope{}
scope.On("IncCounter", metrics.SyncMatchForwardTaskThrottleErrorPerTasklist)
scope.On("RecordTimer", mock.Anything, mock.Anything)
t.matcher.scope = &scope
for i := 0; i < 5; i++ {
t.client.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
Expand Down
2 changes: 1 addition & 1 deletion service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@
var matched bool
var err error
if params.ActivityTaskDispatchInfo != nil {
matched, err = c.matcher.offerOrTimeout(childCtx, task)
matched, err = c.matcher.offerOrTimeout(childCtx, c.timeSource.Now(), task)

Check warning on line 543 in service/matching/tasklist/task_list_manager.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/task_list_manager.go#L543

Added line #L543 was not covered by tests
} else {
matched, err = c.matcher.Offer(childCtx, task)
}
Expand Down
Loading