Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more metrics for matching #6208

Merged
merged 2 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2547,12 +2547,14 @@ const (
SyncMatchLocalPollLatencyPerTaskList
SyncMatchForwardPollLatencyPerTaskList
AsyncMatchLocalPollCounterPerTaskList
AsyncMatchLocalPollAttemptPerTaskList
AsyncMatchLocalPollLatencyPerTaskList
AsyncMatchForwardPollCounterPerTaskList
AsyncMatchForwardPollAttemptPerTaskList
AsyncMatchForwardPollLatencyPerTaskList
AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList
AsyncMatchLocalPollAfterForwardFailedAttemptPerTaskList
AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList
AsyncMatchAttemptPerTaskList
PollLocalMatchLatencyPerTaskList
PollForwardMatchLatencyPerTaskList
PollLocalMatchAfterForwardFailedLatencyPerTaskList
Expand Down Expand Up @@ -3214,12 +3216,14 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
SyncMatchLocalPollLatencyPerTaskList: {metricName: "syncmatch_local_poll_latency_per_tl", metricRollupName: "syncmatch_local_poll_latency"},
SyncMatchForwardPollLatencyPerTaskList: {metricName: "syncmatch_forward_poll_latency_per_tl", metricRollupName: "syncmatch_forward_poll_latency"},
AsyncMatchLocalPollCounterPerTaskList: {metricName: "asyncmatch_local_poll_per_tl", metricRollupName: "asyncmatch_local_poll"},
AsyncMatchLocalPollAttemptPerTaskList: {metricName: "asyncmatch_local_poll_attempt_per_tl", metricRollupName: "asyncmatch_local_poll_attempt", metricType: Timer},
AsyncMatchLocalPollLatencyPerTaskList: {metricName: "asyncmatch_local_poll_latency_per_tl", metricRollupName: "asyncmatch_local_poll_latency"},
AsyncMatchForwardPollCounterPerTaskList: {metricName: "asyncmatch_forward_poll_per_tl", metricRollupName: "asyncmatch_forward_poll"},
AsyncMatchForwardPollAttemptPerTaskList: {metricName: "asyncmatch_forward_poll_attempt_per_tl", metricRollupName: "asyncmatch_forward_poll_attempt", metricType: Timer},
AsyncMatchForwardPollLatencyPerTaskList: {metricName: "asyncmatch_forward_poll_latency_per_tl", metricRollupName: "asyncmatch_forward_poll_latency"},
AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed"},
AsyncMatchLocalPollAfterForwardFailedAttemptPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_attempt_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed_attempt", metricType: Timer},
AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList: {metricName: "asyncmatch_local_poll_after_forward_failed_latency_per_tl", metricRollupName: "asyncmatch_local_poll_after_forward_failed_latency"},
AsyncMatchAttemptPerTaskList: {metricName: "asyncmatch_attempt_per_tl", metricRollupName: "asyncmatch_attempt"},
PollLocalMatchLatencyPerTaskList: {metricName: "poll_local_match_latency_per_tl", metricRollupName: "poll_local_match_latency", metricType: Timer},
PollForwardMatchLatencyPerTaskList: {metricName: "poll_forward_match_latency_per_tl", metricRollupName: "poll_forward_match_latency", metricType: Timer},
PollLocalMatchAfterForwardFailedLatencyPerTaskList: {metricName: "poll_local_match_after_forward_failed_latency_per_tl", metricRollupName: "poll_local_match_after_forward_failed_latency", metricType: Timer},
Expand Down
10 changes: 10 additions & 0 deletions service/matching/tasklist/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync/atomic"

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/types"
Expand All @@ -36,6 +37,7 @@ type (
// Forwarder is the type that contains state pertaining to
// the api call forwarder component
Forwarder struct {
scope metrics.Scope
cfg *config.ForwarderConfig
taskListID *Identifier
taskListKind types.TaskListKind
Expand Down Expand Up @@ -96,6 +98,7 @@ func newForwarder(
kind types.TaskListKind,
client matching.Client,
isolationGroups []string,
scope metrics.Scope,
) *Forwarder {
rpsFunc := func() float64 { return float64(cfg.ForwarderMaxRatePerSecond()) }
fwdr := &Forwarder{
Expand All @@ -107,6 +110,7 @@ func newForwarder(
outstandingPollsLimit: int32(cfg.ForwarderMaxOutstandingPolls()),
limiter: quotas.NewDynamicRateLimiter(rpsFunc),
isolationGroups: isolationGroups,
scope: scope,
}
fwdr.addReqToken.Store(newForwarderReqToken(int(fwdr.outstandingTasksLimit), nil))
fwdr.pollReqToken.Store(newForwarderReqToken(int(fwdr.outstandingPollsLimit), isolationGroups))
Expand All @@ -130,6 +134,8 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *InternalTask) erro

var err error

sw := fwdr.scope.StartTimer(metrics.ForwardTaskLatencyPerTaskList)
defer sw.Stop()
switch fwdr.taskListID.GetType() {
case persistence.TaskListTypeDecision:
err = fwdr.client.AddDecisionTask(ctx, &types.AddDecisionTaskRequest{
Expand Down Expand Up @@ -182,6 +188,8 @@ func (fwdr *Forwarder) ForwardQueryTask(
return nil, ErrNoParent
}

sw := fwdr.scope.StartTimer(metrics.ForwardQueryLatencyPerTaskList)
defer sw.Stop()
resp, err := fwdr.client.QueryWorkflow(ctx, &types.MatchingQueryWorkflowRequest{
DomainUUID: task.Query.Request.DomainUUID,
TaskList: &types.TaskList{
Expand All @@ -206,6 +214,8 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*InternalTask, error) {
return nil, ErrNoParent
}

sw := fwdr.scope.StartTimer(metrics.ForwardPollLatencyPerTaskList)
defer sw.Stop()
pollerID := PollerIDFromContext(ctx)
identity := IdentityFromContext(ctx)
isolationGroup := IsolationGroupFromContext(ctx)
Expand Down
3 changes: 2 additions & 1 deletion service/matching/tasklist/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching/config"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (t *ForwarderTestSuite) SetupTest() {
t.NoError(err)
t.taskList = id
t.isolationGroups = []string{"abc", "xyz"}
t.fwdr = newForwarder(t.cfg, t.taskList, types.TaskListKindNormal, t.client, t.isolationGroups)
t.fwdr = newForwarder(t.cfg, t.taskList, types.TaskListKindNormal, t.client, t.isolationGroups, metrics.NoopScope(metrics.Matching))
}

func (t *ForwarderTestSuite) TearDownTest() {
Expand Down
6 changes: 3 additions & 3 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@
e.EventName = "Dispatched to Local Poller"
event.Log(e)
tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT))
return nil
case token := <-tm.fwdrAddReqTokenC():
Expand Down Expand Up @@ -312,7 +312,7 @@
event.Log(e)
cancel()
tm.scope.IncCounter(metrics.AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollAfterForwardFailedAttemptPerTaskList, time.Duration(attempt))

Check warning on line 315 in service/matching/tasklist/matcher.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/matcher.go#L315

Added line #L315 was not covered by tests
tm.scope.RecordTimer(metrics.AsyncMatchLocalPollAfterForwardFailedLatencyPerTaskList, time.Since(startT))
return nil
case <-childCtx.Done():
Expand All @@ -329,7 +329,7 @@
e.EventName = "Task Forwarded"
event.Log(e)
tm.scope.IncCounter(metrics.AsyncMatchForwardPollCounterPerTaskList)
tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchForwardPollAttemptPerTaskList, time.Duration(attempt))
tm.scope.RecordTimer(metrics.AsyncMatchForwardPollLatencyPerTaskList, time.Since(startT))

// at this point, we forwarded the task to a parent partition which
Expand Down
5 changes: 2 additions & 3 deletions service/matching/tasklist/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (t *MatcherTestSuite) SetupTest() {
}
t.cfg = tlCfg
t.isolationGroups = []string{"dca1", "dca2"}
t.fwdr = newForwarder(&t.cfg.ForwarderConfig, t.taskList, types.TaskListKindNormal, t.client, []string{"dca1", "dca2"})
t.fwdr = newForwarder(&t.cfg.ForwarderConfig, t.taskList, types.TaskListKindNormal, t.client, []string{"dca1", "dca2"}, metrics.NoopScope(metrics.Matching))
t.matcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"}, loggerimpl.NewNopLogger(), t.taskList, types.TaskListKindNormal)

rootTaskList := NewTestTaskListID(t.T(), t.taskList.GetDomainID(), t.taskList.Parent(20), persistence.TaskListTypeDecision)
Expand Down Expand Up @@ -493,12 +493,11 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() {
func (t *MatcherTestSuite) TestMustOfferRemoteRateLimit() {
scope := mocks.Scope{}
scope.On("IncCounter", metrics.AsyncMatchForwardTaskThrottleErrorPerTasklist)
scope.On("RecordTimer", mock.Anything, mock.Anything)
t.matcher.scope = &scope
completionFunc := func(*persistence.TaskInfo, error) {}
for i := 0; i < 5; i++ {
scope.On("IncCounter", metrics.AsyncMatchForwardPollCounterPerTaskList)
scope.On("RecordTimer", metrics.AsyncMatchAttemptPerTaskList, mock.Anything)
scope.On("RecordTimer", metrics.AsyncMatchForwardPollLatencyPerTaskList, mock.Anything)
t.client.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any()).Return(nil)
task := newInternalTask(t.newTaskInfo(), completionFunc, types.TaskSourceDbBacklog, "", false, nil, "")
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
}
var fwdr *Forwarder
if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
fwdr = newForwarder(&taskListConfig.ForwarderConfig, taskList, *taskListKind, matchingClient, isolationGroups)
fwdr = newForwarder(&taskListConfig.ForwarderConfig, taskList, *taskListKind, matchingClient, isolationGroups, scope)

Check warning on line 197 in service/matching/tasklist/task_list_manager.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/task_list_manager.go#L197

Added line #L197 was not covered by tests
}
tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups, tlMgr.logger, taskList, *taskListKind)
tlMgr.taskWriter = newTaskWriter(tlMgr)
Expand Down
Loading