Skip to content

Commit

Permalink
Adds metric around task forwarder rate-limiting (#6172)
Browse files Browse the repository at this point in the history
What changed?

This adds a metric around tasklist forwarding being rate-limited by the quite-low forwarder rate-limit. this presents a risk tor throughput for tasklists which are overpartitioned

Why?

At present, while we have reasonable ability to observe the overall latency in async dispatch, it's quite hard to determine where it's coming from. This provides a layer of visibility here.

How did you test it?

Unt tests, should be a fairly safe change
  • Loading branch information
davidporter-id-au authored Jul 17, 2024
1 parent c3228ee commit 8df9759
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 64 deletions.
82 changes: 43 additions & 39 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,8 @@ const (
ForwardedPerTaskListCounter
ForwardTaskCallsPerTaskList
ForwardTaskErrorsPerTaskList
SyncMatchForwardTaskThrottleErrorPerTasklist
AsyncMatchForwardTaskThrottleErrorPerTasklist
ForwardTaskLatencyPerTaskList
ForwardQueryCallsPerTaskList
ForwardQueryErrorsPerTaskList
Expand Down Expand Up @@ -3142,45 +3144,47 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter},
},
Matching: {
PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"},
PollTimeoutPerTaskListCounter: {metricName: "poll_timeouts_per_tl", metricRollupName: "poll_timeouts"},
PollSuccessWithSyncPerTaskListCounter: {metricName: "poll_success_sync_per_tl", metricRollupName: "poll_success_sync"},
LeaseRequestPerTaskListCounter: {metricName: "lease_requests_per_tl", metricRollupName: "lease_requests"},
LeaseFailurePerTaskListCounter: {metricName: "lease_failures_per_tl", metricRollupName: "lease_failures"},
ConditionFailedErrorPerTaskListCounter: {metricName: "condition_failed_errors_per_tl", metricRollupName: "condition_failed_errors"},
RespondQueryTaskFailedPerTaskListCounter: {metricName: "respond_query_failed_per_tl", metricRollupName: "respond_query_failed"},
SyncThrottlePerTaskListCounter: {metricName: "sync_throttle_count_per_tl", metricRollupName: "sync_throttle_count"},
BufferThrottlePerTaskListCounter: {metricName: "buffer_throttle_count_per_tl", metricRollupName: "buffer_throttle_count"},
BufferUnknownTaskDispatchError: {metricName: "buffer_unknown_task_dispatch_error_per_tl", metricRollupName: "buffer_unknown_task_dispatch_error"},
BufferIsolationGroupRedirectCounter: {metricName: "buffer_isolation_group_redirected_per_tl", metricRollupName: "buffer_isolation_group_redirected"},
BufferIsolationGroupRedirectFailureCounter: {metricName: "buffer_isolation_group_redirect_failure_per_tl", metricRollupName: "buffer_isolation_group_redirect_failure"},
BufferIsolationGroupMisconfiguredCounter: {metricName: "buffer_isolation_group_misconfigured_failure_per_tl", metricRollupName: "buffer_isolation_group_misconfigured_failure"},
ExpiredTasksPerTaskListCounter: {metricName: "tasks_expired_per_tl", metricRollupName: "tasks_expired"},
ForwardedPerTaskListCounter: {metricName: "forwarded_per_tl", metricRollupName: "forwarded"},
ForwardTaskCallsPerTaskList: {metricName: "forward_task_calls_per_tl", metricRollupName: "forward_task_calls"},
ForwardTaskErrorsPerTaskList: {metricName: "forward_task_errors_per_tl", metricRollupName: "forward_task_errors"},
ForwardQueryCallsPerTaskList: {metricName: "forward_query_calls_per_tl", metricRollupName: "forward_query_calls"},
ForwardQueryErrorsPerTaskList: {metricName: "forward_query_errors_per_tl", metricRollupName: "forward_query_errors"},
ForwardPollCallsPerTaskList: {metricName: "forward_poll_calls_per_tl", metricRollupName: "forward_poll_calls"},
ForwardPollErrorsPerTaskList: {metricName: "forward_poll_errors_per_tl", metricRollupName: "forward_poll_errors"},
SyncMatchLatencyPerTaskList: {metricName: "syncmatch_latency_per_tl", metricRollupName: "syncmatch_latency", metricType: Timer},
AsyncMatchLatencyPerTaskList: {metricName: "asyncmatch_latency_per_tl", metricRollupName: "asyncmatch_latency", metricType: Timer},
AsyncMatchDispatchLatencyPerTaskList: {metricName: "asyncmatch_dispatch_latency_per_tl", metricRollupName: "asyncmatch_dispatch_latency", metricType: Timer},
AsyncMatchDispatchTimeoutCounterPerTaskList: {metricName: "asyncmatch_dispatch_timeouts_per_tl", metricRollupName: "asyncmatch_dispatch_timeouts"},
ForwardTaskLatencyPerTaskList: {metricName: "forward_task_latency_per_tl", metricRollupName: "forward_task_latency"},
ForwardQueryLatencyPerTaskList: {metricName: "forward_query_latency_per_tl", metricRollupName: "forward_query_latency"},
ForwardPollLatencyPerTaskList: {metricName: "forward_poll_latency_per_tl", metricRollupName: "forward_poll_latency"},
LocalToLocalMatchPerTaskListCounter: {metricName: "local_to_local_matches_per_tl", metricRollupName: "local_to_local_matches"},
LocalToRemoteMatchPerTaskListCounter: {metricName: "local_to_remote_matches_per_tl", metricRollupName: "local_to_remote_matches"},
RemoteToLocalMatchPerTaskListCounter: {metricName: "remote_to_local_matches_per_tl", metricRollupName: "remote_to_local_matches"},
RemoteToRemoteMatchPerTaskListCounter: {metricName: "remote_to_remote_matches_per_tl", metricRollupName: "remote_to_remote_matches"},
IsolationTaskMatchPerTaskListCounter: {metricName: "isolation_task_matches_per_tl", metricType: Counter},
PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"},
PollerInvalidIsolationGroupCounter: {metricName: "poller_invalid_isolation_group_per_tl", metricType: Counter},
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge},
PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"},
PollTimeoutPerTaskListCounter: {metricName: "poll_timeouts_per_tl", metricRollupName: "poll_timeouts"},
PollSuccessWithSyncPerTaskListCounter: {metricName: "poll_success_sync_per_tl", metricRollupName: "poll_success_sync"},
LeaseRequestPerTaskListCounter: {metricName: "lease_requests_per_tl", metricRollupName: "lease_requests"},
LeaseFailurePerTaskListCounter: {metricName: "lease_failures_per_tl", metricRollupName: "lease_failures"},
ConditionFailedErrorPerTaskListCounter: {metricName: "condition_failed_errors_per_tl", metricRollupName: "condition_failed_errors"},
RespondQueryTaskFailedPerTaskListCounter: {metricName: "respond_query_failed_per_tl", metricRollupName: "respond_query_failed"},
SyncThrottlePerTaskListCounter: {metricName: "sync_throttle_count_per_tl", metricRollupName: "sync_throttle_count"},
BufferThrottlePerTaskListCounter: {metricName: "buffer_throttle_count_per_tl", metricRollupName: "buffer_throttle_count"},
BufferUnknownTaskDispatchError: {metricName: "buffer_unknown_task_dispatch_error_per_tl", metricRollupName: "buffer_unknown_task_dispatch_error"},
BufferIsolationGroupRedirectCounter: {metricName: "buffer_isolation_group_redirected_per_tl", metricRollupName: "buffer_isolation_group_redirected"},
BufferIsolationGroupRedirectFailureCounter: {metricName: "buffer_isolation_group_redirect_failure_per_tl", metricRollupName: "buffer_isolation_group_redirect_failure"},
BufferIsolationGroupMisconfiguredCounter: {metricName: "buffer_isolation_group_misconfigured_failure_per_tl", metricRollupName: "buffer_isolation_group_misconfigured_failure"},
ExpiredTasksPerTaskListCounter: {metricName: "tasks_expired_per_tl", metricRollupName: "tasks_expired"},
ForwardedPerTaskListCounter: {metricName: "forwarded_per_tl", metricRollupName: "forwarded"},
ForwardTaskCallsPerTaskList: {metricName: "forward_task_calls_per_tl", metricRollupName: "forward_task_calls"},
ForwardTaskErrorsPerTaskList: {metricName: "forward_task_errors_per_tl", metricRollupName: "forward_task_errors"},
SyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "sync_forward_task_throttle_errors_per_tl", metricRollupName: "sync_forward_task_throttle_errors"},
AsyncMatchForwardTaskThrottleErrorPerTasklist: {metricName: "async_forward_task_throttle_errors_per_tl", metricRollupName: "async_forward_task_throttle_errors"},
ForwardQueryCallsPerTaskList: {metricName: "forward_query_calls_per_tl", metricRollupName: "forward_query_calls"},
ForwardQueryErrorsPerTaskList: {metricName: "forward_query_errors_per_tl", metricRollupName: "forward_query_errors"},
ForwardPollCallsPerTaskList: {metricName: "forward_poll_calls_per_tl", metricRollupName: "forward_poll_calls"},
ForwardPollErrorsPerTaskList: {metricName: "forward_poll_errors_per_tl", metricRollupName: "forward_poll_errors"},
SyncMatchLatencyPerTaskList: {metricName: "syncmatch_latency_per_tl", metricRollupName: "syncmatch_latency", metricType: Timer},
AsyncMatchLatencyPerTaskList: {metricName: "asyncmatch_latency_per_tl", metricRollupName: "asyncmatch_latency", metricType: Timer},
AsyncMatchDispatchLatencyPerTaskList: {metricName: "asyncmatch_dispatch_latency_per_tl", metricRollupName: "asyncmatch_dispatch_latency", metricType: Timer},
AsyncMatchDispatchTimeoutCounterPerTaskList: {metricName: "asyncmatch_dispatch_timeouts_per_tl", metricRollupName: "asyncmatch_dispatch_timeouts"},
ForwardTaskLatencyPerTaskList: {metricName: "forward_task_latency_per_tl", metricRollupName: "forward_task_latency"},
ForwardQueryLatencyPerTaskList: {metricName: "forward_query_latency_per_tl", metricRollupName: "forward_query_latency"},
ForwardPollLatencyPerTaskList: {metricName: "forward_poll_latency_per_tl", metricRollupName: "forward_poll_latency"},
LocalToLocalMatchPerTaskListCounter: {metricName: "local_to_local_matches_per_tl", metricRollupName: "local_to_local_matches"},
LocalToRemoteMatchPerTaskListCounter: {metricName: "local_to_remote_matches_per_tl", metricRollupName: "local_to_remote_matches"},
RemoteToLocalMatchPerTaskListCounter: {metricName: "remote_to_local_matches_per_tl", metricRollupName: "remote_to_local_matches"},
RemoteToRemoteMatchPerTaskListCounter: {metricName: "remote_to_remote_matches_per_tl", metricRollupName: "remote_to_remote_matches"},
IsolationTaskMatchPerTaskListCounter: {metricName: "isolation_task_matches_per_tl", metricType: Counter},
PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"},
PollerInvalidIsolationGroupCounter: {metricName: "poller_invalid_isolation_group_per_tl", metricType: Counter},
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
TaskCountPerTaskListGauge: {metricName: "task_count_per_tl", metricType: Gauge},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
28 changes: 14 additions & 14 deletions service/matching/tasklist/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ type (
)

var (
errNoParent = errors.New("cannot find parent task list for forwarding")
errTaskListKind = errors.New("forwarding is not supported on sticky task list")
errInvalidTaskListType = errors.New("unrecognized task list type")
errForwarderSlowDown = errors.New("limit exceeded")
ErrNoParent = errors.New("cannot find parent task list for forwarding")
ErrTaskListKind = errors.New("forwarding is not supported on sticky task list")
ErrInvalidTaskListType = errors.New("unrecognized task list type")
ErrForwarderSlowDown = errors.New("tasklist forwarding throttle limit exceeded")
)

// noopForwarderTokenC refers to a token channel that blocks forever
Expand Down Expand Up @@ -116,16 +116,16 @@ func newForwarder(
// ForwardTask forwards an activity or decision task to the parent task list partition if it exist
func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *InternalTask) error {
if fwdr.taskListKind == types.TaskListKindSticky {
return errTaskListKind
return ErrTaskListKind
}

name := fwdr.taskListID.Parent(fwdr.cfg.ForwarderMaxChildrenPerNode())
if name == "" {
return errNoParent
return ErrNoParent
}

if !fwdr.limiter.Allow() {
return errForwarderSlowDown
return ErrForwarderSlowDown
}

var err error
Expand Down Expand Up @@ -161,7 +161,7 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *InternalTask) erro
PartitionConfig: task.Event.PartitionConfig,
})
default:
return errInvalidTaskListType
return ErrInvalidTaskListType
}

return fwdr.handleErr(err)
Expand All @@ -174,12 +174,12 @@ func (fwdr *Forwarder) ForwardQueryTask(
) (*types.QueryWorkflowResponse, error) {

if fwdr.taskListKind == types.TaskListKindSticky {
return nil, errTaskListKind
return nil, ErrTaskListKind
}

name := fwdr.taskListID.Parent(fwdr.cfg.ForwarderMaxChildrenPerNode())
if name == "" {
return nil, errNoParent
return nil, ErrNoParent
}

resp, err := fwdr.client.QueryWorkflow(ctx, &types.MatchingQueryWorkflowRequest{
Expand All @@ -198,12 +198,12 @@ func (fwdr *Forwarder) ForwardQueryTask(
// ForwardPoll forwards a poll request to parent task list partition if it exist
func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*InternalTask, error) {
if fwdr.taskListKind == types.TaskListKindSticky {
return nil, errTaskListKind
return nil, ErrTaskListKind
}

name := fwdr.taskListID.Parent(fwdr.cfg.ForwarderMaxChildrenPerNode())
if name == "" {
return nil, errNoParent
return nil, ErrNoParent
}

pollerID := PollerIDFromContext(ctx)
Expand Down Expand Up @@ -249,7 +249,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*InternalTask, error) {
return newInternalStartedTask(&startedTaskInfo{activityTaskInfo: resp}), nil
}

return nil, errInvalidTaskListType
return nil, ErrInvalidTaskListType
}

// AddReqTokenC returns a channel that can be used to wait for a token
Expand Down Expand Up @@ -284,7 +284,7 @@ func (fwdr *Forwarder) refreshTokenC(value *atomic.Value, curr *int32, maxLimit

func (fwdr *Forwarder) handleErr(err error) error {
if _, ok := err.(*types.ServiceBusyError); ok {
return errForwarderSlowDown
return ErrForwarderSlowDown
}
return err
}
Expand Down
14 changes: 7 additions & 7 deletions service/matching/tasklist/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func (t *ForwarderTestSuite) TearDownTest() {

func (t *ForwarderTestSuite) TestForwardTaskError() {
task := newInternalTask(&persistence.TaskInfo{}, nil, types.TaskSourceHistory, "", false, nil, "")
t.Equal(errNoParent, t.fwdr.ForwardTask(context.Background(), task))
t.Equal(ErrNoParent, t.fwdr.ForwardTask(context.Background(), task))

t.usingTasklistPartition(persistence.TaskListTypeActivity)
t.fwdr.taskListKind = types.TaskListKindSticky
t.Equal(errTaskListKind, t.fwdr.ForwardTask(context.Background(), task))
t.Equal(ErrTaskListKind, t.fwdr.ForwardTask(context.Background(), task))
}

func (t *ForwarderTestSuite) TestForwardDecisionTask() {
Expand Down Expand Up @@ -142,18 +142,18 @@ func (t *ForwarderTestSuite) TestForwardTaskRateExceeded() {
for i := 0; i < rps; i++ {
t.NoError(t.fwdr.ForwardTask(context.Background(), task))
}
t.Equal(errForwarderSlowDown, t.fwdr.ForwardTask(context.Background(), task))
t.Equal(ErrForwarderSlowDown, t.fwdr.ForwardTask(context.Background(), task))
}

func (t *ForwarderTestSuite) TestForwardQueryTaskError() {
task := newInternalQueryTask("id1", &types.MatchingQueryWorkflowRequest{})
_, err := t.fwdr.ForwardQueryTask(context.Background(), task)
t.Equal(errNoParent, err)
t.Equal(ErrNoParent, err)

t.usingTasklistPartition(persistence.TaskListTypeDecision)
t.fwdr.taskListKind = types.TaskListKindSticky
_, err = t.fwdr.ForwardQueryTask(context.Background(), task)
t.Equal(errTaskListKind, err)
t.Equal(ErrTaskListKind, err)
}

func (t *ForwarderTestSuite) TestForwardQueryTask() {
Expand Down Expand Up @@ -191,12 +191,12 @@ func (t *ForwarderTestSuite) TestForwardQueryTaskRateNotEnforced() {

func (t *ForwarderTestSuite) TestForwardPollError() {
_, err := t.fwdr.ForwardPoll(context.Background())
t.Equal(errNoParent, err)
t.Equal(ErrNoParent, err)

t.usingTasklistPartition(persistence.TaskListTypeActivity)
t.fwdr.taskListKind = types.TaskListKindSticky
_, err = t.fwdr.ForwardPoll(context.Background())
t.Equal(errTaskListKind, err)
t.Equal(ErrTaskListKind, err)

}

Expand Down
14 changes: 10 additions & 4 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,15 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err
// root partition if possible
select {
case token := <-tm.fwdrAddReqTokenC():
if err := tm.fwdr.ForwardTask(ctx, task); err == nil {
err := tm.fwdr.ForwardTask(ctx, task)
token.release("")
if err == nil {
// task was remotely sync matched on the parent partition
token.release("")
return true, nil
}
token.release("")
if errors.Is(err, ErrForwarderSlowDown) {
tm.scope.IncCounter(metrics.SyncMatchForwardTaskThrottleErrorPerTasklist)
}
default:
if !tm.isForwardingAllowed() && // we are the root partition and forwarding is not possible
task.source == types.TaskSourceDbBacklog && // task was from backlog (stored in db)
Expand Down Expand Up @@ -198,7 +201,7 @@ func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*typ
if err == nil {
return resp, nil
}
if err == errForwarderSlowDown {
if err == ErrForwarderSlowDown {
// if we are rate limited, try only local match for the
// remainder of the context timeout left
fwdrTokenC = noopForwarderTokenC
Expand Down Expand Up @@ -240,6 +243,9 @@ forLoop:
token.release("")
if err != nil {

if errors.Is(err, ErrForwarderSlowDown) {
tm.scope.IncCounter(metrics.AsyncMatchForwardTaskThrottleErrorPerTasklist)
}
tm.log.Debug("failed to forward task",
tag.Error(err),
tag.TaskID(task.Event.TaskID),
Expand Down
29 changes: 29 additions & 0 deletions service/matching/tasklist/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/metrics/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -237,6 +238,20 @@ func (t *MatcherTestSuite) TestSyncMatchFailure() {
t.False(syncMatch)
}

func (t *MatcherTestSuite) TestRateLimitHandling() {
scope := mocks.Scope{}
scope.On("IncCounter", metrics.SyncMatchForwardTaskThrottleErrorPerTasklist)
t.matcher.scope = &scope
for i := 0; i < 5; i++ {
t.client.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", true, nil, "")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := t.matcher.Offer(ctx, task)
cancel()
assert.NoError(t.T(), err)
}
}

func (t *MatcherTestSuite) TestIsolationSyncMatchFailure() {
// force disable remote forwarding
for i := 0; i < len(t.isolationGroups)+1; i++ {
Expand Down Expand Up @@ -473,6 +488,20 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() {
t.Equal(t.taskList.Parent(20), req.GetTaskList().GetName())
}

func (t *MatcherTestSuite) TestMustOfferRemoteRateLimit() {
scope := mocks.Scope{}
scope.On("IncCounter", metrics.AsyncMatchForwardTaskThrottleErrorPerTasklist)
t.matcher.scope = &scope
completionFunc := func(*persistence.TaskInfo, error) {}
for i := 0; i < 5; i++ {
t.client.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any()).Return(nil)
task := newInternalTask(t.newTaskInfo(), completionFunc, types.TaskSourceDbBacklog, "", false, nil, "")
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
t.NoError(t.matcher.MustOffer(ctx, task))
cancel()
}
}

func (t *MatcherTestSuite) TestIsolationMustOfferRemoteMatch() {
pollSigC := make(chan struct{})

Expand Down

0 comments on commit 8df9759

Please sign in to comment.