Skip to content

Commit

Permalink
Merge branch 'master' of github.com:uber/cadence
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenxia committed Aug 21, 2024
2 parents a8114e1 + 947c4f7 commit 64301b7
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 28 deletions.
13 changes: 13 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,9 @@ const (
FrontendQueryWorkflowScope
// FrontendDescribeWorkflowExecutionScope is the metric scope for frontend.DescribeWorkflowExecution
FrontendDescribeWorkflowExecutionScope
// FrontendDescribeWorkflowExecutionStatusScope is a custom metric for more
// rich details about workflow description calls, including workflow open/closed status
FrontendDescribeWorkflowExecutionStatusScope
// FrontendDescribeTaskListScope is the metric scope for frontend.DescribeTaskList
FrontendDescribeTaskListScope
// FrontendResetStickyTaskListScope is the metric scope for frontend.ResetStickyTaskList
Expand Down Expand Up @@ -1801,6 +1804,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendDeprecateDomainScope: {operation: "DeprecateDomain"},
FrontendQueryWorkflowScope: {operation: "QueryWorkflow"},
FrontendDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
FrontendDescribeWorkflowExecutionStatusScope: {operation: "DescribeWorkflowExecutionStatus"},
FrontendListTaskListPartitionsScope: {operation: "FrontendListTaskListPartitions"},
FrontendGetTaskListsByDomainScope: {operation: "FrontendGetTaskListsByDomain"},
FrontendRefreshWorkflowTasksScope: {operation: "FrontendRefreshWorkflowTasks"},
Expand Down Expand Up @@ -2129,6 +2133,9 @@ const (
KafkaConsumerMessageNackDlqErr
KafkaConsumerSessionStart

DescribeWorkflowStatusCount
DescribeWorkflowStatusError

GracefulFailoverLatency
GracefulFailoverFailure

Expand Down Expand Up @@ -2558,6 +2565,8 @@ const (
PollLocalMatchLatencyPerTaskList
PollForwardMatchLatencyPerTaskList
PollLocalMatchAfterForwardFailedLatencyPerTaskList
PollDecisionTaskAlreadyStartedCounterPerTaskList
PollActivityTaskAlreadyStartedCounterPerTaskList

NumMatchingMetrics
)
Expand Down Expand Up @@ -2878,6 +2887,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter},
ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter},
HashringViewIdentifier: {metricName: "hashring_view_identifier", metricType: Counter},
DescribeWorkflowStatusError: {metricName: "describe_wf_error", metricType: Counter},
DescribeWorkflowStatusCount: {metricName: "describe_wf_status", metricType: Counter},

AsyncRequestPayloadSize: {metricName: "async_request_payload_size_per_domain", metricRollupName: "async_request_payload_size", metricType: Timer},

Expand Down Expand Up @@ -3227,6 +3238,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
PollLocalMatchLatencyPerTaskList: {metricName: "poll_local_match_latency_per_tl", metricRollupName: "poll_local_match_latency", metricType: Timer},
PollForwardMatchLatencyPerTaskList: {metricName: "poll_forward_match_latency_per_tl", metricRollupName: "poll_forward_match_latency", metricType: Timer},
PollLocalMatchAfterForwardFailedLatencyPerTaskList: {metricName: "poll_local_match_after_forward_failed_latency_per_tl", metricRollupName: "poll_local_match_after_forward_failed_latency", metricType: Timer},
PollDecisionTaskAlreadyStartedCounterPerTaskList: {metricName: "poll_decision_task_already_started_per_tl", metricType: Counter},
PollActivityTaskAlreadyStartedCounterPerTaskList: {metricName: "poll_activity_task_already_started_per_tl", metricType: Counter},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
7 changes: 7 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"
workflowTerminationReason = "workflow_termination_reason"
workflowCloseStatus = "workflow_close_status"

// limiter-side tags
globalRatelimitKey = "global_ratelimit_key"
Expand Down Expand Up @@ -285,6 +286,12 @@ func WorkflowTerminationReasonTag(value string) Tag {
return simpleMetric{key: workflowTerminationReason, value: value}
}

// WorkflowCloseStatusTag is a stringified workflow status
func WorkflowCloseStatusTag(value string) Tag {
value = safeAlphaNumericStringRE.ReplaceAllString(value, "_")
return simpleMetric{key: workflowCloseStatus, value: value}
}

// PartitionConfigTags returns a list of partition config tags
func PartitionConfigTags(partitionConfig map[string]string) []Tag {
tags := make([]Tag, 0, len(partitionConfig))
Expand Down
42 changes: 30 additions & 12 deletions host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestMatchingSimulationSuite(t *testing.T) {
mockHistoryCl := history.NewMockClient(ctrl)
mockHistoryCl.EXPECT().RecordDecisionTaskStarted(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *types.RecordDecisionTaskStartedRequest, opts ...yarpc.CallOption) (*types.RecordDecisionTaskStartedResponse, error) {
time.Sleep(getRecordDecisionTaskStartedTime(clusterConfig.MatchingConfig.SimulationConfig.RecordDecisionTaskStartedTime))
return &types.RecordDecisionTaskStartedResponse{
ScheduledEventID: req.ScheduleID,
}, nil
Expand Down Expand Up @@ -186,12 +187,13 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
pollDuration := getPollDuration(s.testClusterConfig.MatchingConfig.SimulationConfig.PollTimeout)
polledTasksCounter := int32(0)
maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate)
taskProcessTime := getTaskProcessTime(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskProcessTime)
var tasksToReceive sync.WaitGroup
tasksToReceive.Add(maxTasksToGenerate)
var pollerWG sync.WaitGroup
for i := 0; i < numPollers; i++ {
pollerWG.Add(1)
go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, statsCh, &tasksToReceive)
go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, taskProcessTime, statsCh, &tasksToReceive)
}

// wait a bit for pollers to start.
Expand All @@ -200,7 +202,8 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
startTime := time.Now()
// Start task generators
rps := getTaskQPS(s.testClusterConfig.MatchingConfig.SimulationConfig.TasksPerSecond)
rateLimiter := rate.NewLimiter(rate.Limit(rps), rps)
burst := getTaskBurst(s.testClusterConfig.MatchingConfig.SimulationConfig.TasksBurst)
rateLimiter := rate.NewLimiter(rate.Limit(rps), burst)
generatedTasksCounter := int32(0)
lastTaskScheduleID := int32(0)
numGenerators := getNumGenerators(s.testClusterConfig.MatchingConfig.SimulationConfig.NumTaskGenerators)
Expand Down Expand Up @@ -326,20 +329,18 @@ func (s *MatchingSimulationSuite) poll(
domainID, tasklist string,
polledTasksCounter *int32,
wg *sync.WaitGroup,
pollDuration time.Duration,
pollDuration, taskProcessTime time.Duration,
statsCh chan *operationStats,
tasksToReceive *sync.WaitGroup,
) {
defer wg.Done()
t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()

for {
select {
case <-ctx.Done():
s.log("Poller done")
return
case <-t.C:
default:
s.log("Poller will initiate a poll")
reqCtx, cancel := context.WithTimeout(ctx, pollDuration)
start := time.Now()
Expand Down Expand Up @@ -375,6 +376,7 @@ func (s *MatchingSimulationSuite) poll(
atomic.AddInt32(polledTasksCounter, 1)
s.log("PollForDecisionTask got a task with startedid: %d. resp: %+v", resp.StartedEventID, resp)
tasksToReceive.Done()
time.Sleep(taskProcessTime)
}
}
}
Expand Down Expand Up @@ -475,16 +477,10 @@ func getPartitions(i int) int {
}

func getForwarderMaxOutstandingPolls(i int) int {
if i == 0 {
return 20
}
return i
}

func getForwarderMaxOutstandingTasks(i int) int {
if i == 0 {
return 1
}
return i
}

Expand All @@ -508,3 +504,25 @@ func getTaskQPS(i int) int {
}
return i
}

func getTaskBurst(i int) int {
if i == 0 {
return 1
}
return i
}

func getTaskProcessTime(duration time.Duration) time.Duration {
if duration == 0 {
return time.Millisecond
}
return duration
}

func getRecordDecisionTaskStartedTime(duration time.Duration) time.Duration {
if duration == 0 {
return time.Millisecond
}

return duration
}
11 changes: 11 additions & 0 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ type (
// The total QPS to generate tasks. Defaults to 40.
TasksPerSecond int

// The burst value for the rate limiter for task generation. Controls the maximum number of AddTask requests
// that can be sent concurrently. For example, if you have TasksPerSecond, TasksBurst, and NumTaskGenerators all
// set to 10 then every second you'll get 10 tasks added right at the start of the second. If you instead set
// TasksBurst to 1 then you'd get a steady stream of tasks, with one task every 100ms.
TasksBurst int

// Upper limit of tasks to generate. Task generators will stop if total number of tasks generated reaches MaxTaskToGenerate during simulation
// Defaults to 2k
MaxTaskToGenerate int
Expand All @@ -193,6 +199,11 @@ type (

// LocalTaskWaitTime. defaults to 0ms.
LocalTaskWaitTime time.Duration

// TaskProcessTime. The amount of time spent by the poller in-between requests
TaskProcessTime time.Duration
// RecordDecisionTaskStartedTime. The amount of time spent by History to complete RecordDecisionTaskStarted
RecordDecisionTaskStartedTime time.Duration
}

// CadenceParams contains everything needed to bootstrap Cadence
Expand Down
15 changes: 9 additions & 6 deletions host/testdata/matching_simulation_burst.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@ historyconfig:
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 2
tasklistreadpartitions: 2
numpollers: 10
numtaskgenerators: 2
taskspersecond: 200
maxtasktogenerate: 1500
tasklistwritepartitions: 4
tasklistreadpartitions: 4
numpollers: 8
numtaskgenerators: 10
taskspersecond: 500
maxtasktogenerate: 10000
polltimeout: 60s
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 0ms
localtaskwaittime: 0ms
taskprocesstime: 1ms
workerconfig:
enableasyncwfconsumer: false
11 changes: 6 additions & 5 deletions host/testdata/matching_simulation_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ historyconfig:
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 2
tasklistreadpartitions: 2
numpollers: 10
tasklistwritepartitions: 4
tasklistreadpartitions: 4
numpollers: 8
numtaskgenerators: 2
taskspersecond: 40
maxtasktogenerate: 1500
taskspersecond: 80
maxtasktogenerate: 3000
polltimeout: 60s
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 0ms
localtaskwaittime: 0ms
taskprocesstime: 1ms
workerconfig:
enableasyncwfconsumer: false
10 changes: 5 additions & 5 deletions host/testdata/matching_simulation_more_read_partitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ historyconfig:
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 2
tasklistreadpartitions: 4
numpollers: 10
tasklistwritepartitions: 4
tasklistreadpartitions: 8
numpollers: 8
numtaskgenerators: 2
taskspersecond: 40
maxtasktogenerate: 1500
taskspersecond: 80
maxtasktogenerate: 3000
polltimeout: 60s
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
Expand Down
26 changes: 26 additions & 0 deletions host/testdata/matching_simulation_no_forwarding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 4
tasklistreadpartitions: 4
numpollers: 8
numtaskgenerators: 2
taskspersecond: 80
maxtasktogenerate: 3000
polltimeout: 60s
forwardermaxoutstandingpolls: 0
forwardermaxoutstandingtasks: 0
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 0ms
localtaskwaittime: 0ms
taskprocesstime: 1ms
workerconfig:
enableasyncwfconsumer: false
26 changes: 26 additions & 0 deletions host/testdata/matching_simulation_throughput.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 1
tasklistreadpartitions: 1
numpollers: 8
numtaskgenerators: 10
taskspersecond: 80
maxtasktogenerate: 3000
polltimeout: 60s
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 0ms
localtaskwaittime: 0ms
taskprocesstime: 1ms
workerconfig:
enableasyncwfconsumer: false
19 changes: 19 additions & 0 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3415,6 +3415,8 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(
Request: request,
})

wh.emitDescribeWorkflowExecutionMetrics(domainName, response, err)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4071,6 +4073,23 @@ func (hs HealthStatus) String() string {
}
}

func (wh *WorkflowHandler) emitDescribeWorkflowExecutionMetrics(domain string, response *types.DescribeWorkflowExecutionResponse, err error) {
scope := wh.GetMetricsClient().Scope(metrics.FrontendDescribeWorkflowExecutionStatusScope, metrics.DomainTag(domain))

if err != nil || response == nil {
scope.IncCounter(metrics.DescribeWorkflowStatusError)
return
}

status := "unknown"
if response.WorkflowExecutionInfo != nil && response.WorkflowExecutionInfo.CloseStatus != nil {
status = response.WorkflowExecutionInfo.CloseStatus.String()
}

scope = scope.Tagged(metrics.WorkflowCloseStatusTag(status))
scope.IncCounter(metrics.DescribeWorkflowStatusCount)
}

func getDomainWfIDRunIDTags(
domainName string,
wf *types.WorkflowExecution,
Expand Down
Loading

0 comments on commit 64301b7

Please sign in to comment.