diff --git a/.buildkite/pipeline-master.yml b/.buildkite/pipeline-master.yml index 97bd2b40a75..ed47d9458d0 100644 --- a/.buildkite/pipeline-master.yml +++ b/.buildkite/pipeline-master.yml @@ -304,5 +304,5 @@ steps: name: ubercadence-dockerhub key: password - docker-login#v2.0.1: - username: ubercadence + username: jht305 password-env: DOCKER_LOGIN_PASSWORD diff --git a/common/definition/indexedKeys_test.go b/common/definition/indexedKeys_test.go new file mode 100644 index 00000000000..4b44d0a9351 --- /dev/null +++ b/common/definition/indexedKeys_test.go @@ -0,0 +1,44 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package definition + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsSystemBoolKey(t *testing.T) { + tests := []struct { + key string + expected bool + }{ + {"IsCron", true}, + {"StartTime", false}, + } + + for _, test := range tests { + actualResult := IsSystemBoolKey(test.key) + assert.Equal(t, test.expected, actualResult) + } +} diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index ef1e21c98bb..c46aee7b232 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1672,6 +1672,13 @@ const ( // Default value: false // Allowed filters: DomainID MatchingEnableTaskInfoLogByDomainID + // MatchingEnableTasklistGuardAgainstOwnershipShardLoss + // enables guards to prevent tasklists from processing if there is any detection that the host + // no longer is active or owns the shard + // KeyName: matching.enableTasklistGuardAgainstOwnershipLoss + // Value type: Bool + // Default value: false + MatchingEnableTasklistGuardAgainstOwnershipShardLoss // key for history @@ -2859,6 +2866,13 @@ const ( // Allowed filters: domainName, taskListName, taskListType LocalPollWaitTime + // LocalTaskWaitTime is the wait time for a task to wait before considering task forwarding + // KeyName: matching.localTaskWaitTime + // Value type: Duration + // Default value: 10ms + // Allowed filters: domainName, taskListName, taskListType + LocalTaskWaitTime + // LastDurationKey must be the last one in this const group LastDurationKey ) @@ -4109,6 +4123,11 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "MatchingEnableTaskInfoLogByDomainID is enables info level logs for decision/activity task based on the request domainID", DefaultValue: false, }, + MatchingEnableTasklistGuardAgainstOwnershipShardLoss: { + KeyName: "matching.enableTasklistGuardAgainstOwnershipLoss", + Description: "allows guards to ensure that tasklists don't continue processing if there's signal that they've lost ownership", + DefaultValue: false, + }, EventsCacheGlobalEnable: { KeyName: "history.eventsCacheGlobalEnable", Description: "EventsCacheGlobalEnable is enables global cache over all history shards", @@ -5148,6 +5167,12 @@ var DurationKeys = map[DurationKey]DynamicDuration{ Description: "LocalPollWaitTime is the time a poller waits before considering request forwarding.", DefaultValue: time.Millisecond * 10, }, + LocalTaskWaitTime: { + KeyName: "matching.localTaskWaitTime", + Filters: []Filter{DomainName, TaskListName, TaskType}, + Description: "LocalTaskWaitTime is the time a task waits for a poller to arrive before considering task forwarding", + DefaultValue: time.Millisecond * 10, + }, } var MapKeys = map[MapKey]DynamicMap{ diff --git a/common/errors/taskListNotOwnedByHostError.go b/common/errors/taskListNotOwnedByHostError.go index 15a8ba21cce..b1c4a566a82 100644 --- a/common/errors/taskListNotOwnedByHostError.go +++ b/common/errors/taskListNotOwnedByHostError.go @@ -24,21 +24,21 @@ package errors import "fmt" -var _ error = &TaskListNotOwnnedByHostError{} +var _ error = &TaskListNotOwnedByHostError{} -type TaskListNotOwnnedByHostError struct { +type TaskListNotOwnedByHostError struct { OwnedByIdentity string MyIdentity string TasklistName string } -func (m *TaskListNotOwnnedByHostError) Error() string { +func (m *TaskListNotOwnedByHostError) Error() string { return fmt.Sprintf("task list is not owned by this host: OwnedBy: %s, Me: %s, Tasklist: %s", m.OwnedByIdentity, m.MyIdentity, m.TasklistName) } -func NewTaskListNotOwnnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnnedByHostError { - return &TaskListNotOwnnedByHostError{ +func NewTaskListNotOwnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnedByHostError { + return &TaskListNotOwnedByHostError{ OwnedByIdentity: ownedByIdentity, MyIdentity: myIdentity, TasklistName: tasklistName, diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index a0d3b762017..00aa2b21233 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -956,6 +956,12 @@ func VisibilityQuery(query string) Tag { return newStringTag("visibility-query", query) } +// MembershipChangeEvent is a predefined tag for when logging hashring change events, +// expected to be of type membership.ChangeEvent +func MembershipChangeEvent(event interface{}) Tag { + return newPredefinedDynamicTag("membership-change-event", event) +} + // Dynamic Uses reflection based logging for arbitrary values // for not very performant logging func Dynamic(key string, v interface{}) Tag { diff --git a/common/membership/resolver.go b/common/membership/resolver.go index 8afb0347d55..17d060fb2a4 100644 --- a/common/membership/resolver.go +++ b/common/membership/resolver.go @@ -25,6 +25,7 @@ package membership import ( "fmt" + "sync" "sync/atomic" "github.com/uber/cadence/common" @@ -84,6 +85,7 @@ type MultiringResolver struct { status int32 provider PeerProvider + mu sync.Mutex rings map[string]*ring } @@ -110,6 +112,7 @@ func NewMultiringResolver( provider: provider, rings: make(map[string]*ring), metrics: metricsClient, + mu: sync.Mutex{}, } for _, s := range services { @@ -130,6 +133,8 @@ func (rpo *MultiringResolver) Start() { rpo.provider.Start() + rpo.mu.Lock() + defer rpo.mu.Unlock() for _, ring := range rpo.rings { ring.Start() } @@ -145,6 +150,8 @@ func (rpo *MultiringResolver) Stop() { return } + rpo.mu.Lock() + defer rpo.mu.Unlock() for _, ring := range rpo.rings { ring.Stop() } @@ -163,6 +170,8 @@ func (rpo *MultiringResolver) EvictSelf() error { } func (rpo *MultiringResolver) getRing(service string) (*ring, error) { + rpo.mu.Lock() + defer rpo.mu.Unlock() ring, found := rpo.rings[service] if !found { return nil, fmt.Errorf("service %q is not tracked by Resolver", service) diff --git a/common/pinot/pinotQueryValidator_test.go b/common/pinot/pinotQueryValidator_test.go index 7b50d0be9bf..1f6a5c464f3 100644 --- a/common/pinot/pinotQueryValidator_test.go +++ b/common/pinot/pinotQueryValidator_test.go @@ -296,6 +296,11 @@ func TestValidateQuery(t *testing.T) { validated: "", err: "invalid bool value in pinot_query_validator: 1", }, + "case21-5: test bool value- when it is not SQLBool and SQLVAl": { + query: "IsCron = abc", + validated: "", + err: "failed to process a bool key to SQLVal: &{ abc { }}", + }, } for name, test := range tests { diff --git a/host/matching_simulation_test.go b/host/matching_simulation_test.go index ef6dc1104b0..cc547da5d61 100644 --- a/host/matching_simulation_test.go +++ b/host/matching_simulation_test.go @@ -35,8 +35,10 @@ package host import ( "context" + "errors" "flag" "fmt" + "os" "reflect" "strings" "sync" @@ -49,6 +51,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/yarpc" + "golang.org/x/time/rate" "github.com/uber/cadence/client/history" "github.com/uber/cadence/common/dynamicconfig" @@ -63,6 +66,7 @@ type operation string const ( operationPollForDecisionTask operation = "PollForDecisionTask" + defaultTestCase = "testdata/matching_simulation_default.yaml" ) type operationStats struct { @@ -82,7 +86,10 @@ type operationAggStats struct { func TestMatchingSimulationSuite(t *testing.T) { flag.Parse() - confPath := "testdata/matching_simulation.yaml" + confPath := os.Getenv("MATCHING_SIMULATION_CONFIG") + if confPath == "" { + confPath = defaultTestCase + } clusterConfig, err := GetTestClusterConfig(confPath) if err != nil { t.Fatalf("failed creating cluster config from %s, err: %v", confPath, err) @@ -95,6 +102,8 @@ func TestMatchingSimulationSuite(t *testing.T) { dynamicconfig.MatchingForwarderMaxOutstandingTasks: getForwarderMaxOutstandingTasks(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingTasks), dynamicconfig.MatchingForwarderMaxRatePerSecond: getForwarderMaxRPS(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxRatePerSecond), dynamicconfig.MatchingForwarderMaxChildrenPerNode: getForwarderMaxChildPerNode(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxChildrenPerNode), + dynamicconfig.LocalPollWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalPollWaitTime, + dynamicconfig.LocalTaskWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalTaskWaitTime, } ctrl := gomock.NewController(t) @@ -176,31 +185,43 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() { numPollers := getNumPollers(s.testClusterConfig.MatchingConfig.SimulationConfig.NumPollers) pollDuration := getPollDuration(s.testClusterConfig.MatchingConfig.SimulationConfig.PollTimeout) polledTasksCounter := int32(0) + maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate) + var tasksToReceive sync.WaitGroup + tasksToReceive.Add(maxTasksToGenerate) var pollerWG sync.WaitGroup for i := 0; i < numPollers; i++ { pollerWG.Add(1) - go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, statsCh) + go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, statsCh, &tasksToReceive) } // wait a bit for pollers to start. time.Sleep(300 * time.Millisecond) + startTime := time.Now() // Start task generators + rps := getTaskQPS(s.testClusterConfig.MatchingConfig.SimulationConfig.TasksPerSecond) + rateLimiter := rate.NewLimiter(rate.Limit(rps), rps) generatedTasksCounter := int32(0) lastTaskScheduleID := int32(0) numGenerators := getNumGenerators(s.testClusterConfig.MatchingConfig.SimulationConfig.NumTaskGenerators) - taskGenerateInterval := getTaskGenerateInterval(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskGeneratorTickInterval) - maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate) + var tasksToGenerate sync.WaitGroup + tasksToGenerate.Add(maxTasksToGenerate) var generatorWG sync.WaitGroup for i := 1; i <= numGenerators; i++ { generatorWG.Add(1) - go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, taskGenerateInterval, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG) + go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, rateLimiter, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG, &tasksToGenerate) } - // Let it run for a while - sleepDuration := 60 * time.Second - s.log("Wait %v for simulation to run", sleepDuration) - time.Sleep(sleepDuration) + // Let it run until all tasks have been polled. + // There's a test timeout configured in docker/buildkite/docker-compose-local-matching-simulation.yml that you + // can change if your test case needs more time + s.log("Waiting until all tasks are generated") + tasksToGenerate.Wait() + generationTime := time.Now().Sub(startTime) + s.log("Waiting until all tasks are received") + tasksToReceive.Wait() + executionTime := time.Now().Sub(startTime) + s.log("Completed benchmark in %v", (time.Now().Sub(startTime))) s.log("Canceling context to stop pollers and task generators") cancel() pollerWG.Wait() @@ -216,17 +237,20 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() { // Don't change the start/end line format as it is used by scripts to parse the summary info testSummary := []string{} testSummary = append(testSummary, "Simulation Summary:") - testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", sleepDuration)) + testSummary = append(testSummary, fmt.Sprintf("Task generate Duration: %v", generationTime)) + testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", executionTime)) testSummary = append(testSummary, fmt.Sprintf("Num of Pollers: %d", numPollers)) testSummary = append(testSummary, fmt.Sprintf("Poll Timeout: %v", pollDuration)) testSummary = append(testSummary, fmt.Sprintf("Num of Task Generators: %d", numGenerators)) - testSummary = append(testSummary, fmt.Sprintf("Task generated every: %v", taskGenerateInterval)) + testSummary = append(testSummary, fmt.Sprintf("Task generated QPS: %v", rps)) testSummary = append(testSummary, fmt.Sprintf("Num of Write Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistWritePartitions])) testSummary = append(testSummary, fmt.Sprintf("Num of Read Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistReadPartitions])) testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Polls: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingPolls])) testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Tasks: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingTasks])) testSummary = append(testSummary, fmt.Sprintf("Forwarder Max RPS: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxRatePerSecond])) testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Children per Node: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxChildrenPerNode])) + testSummary = append(testSummary, fmt.Sprintf("Local Poll Wait Time: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.LocalPollWaitTime])) + testSummary = append(testSummary, fmt.Sprintf("Local Task Wait Time: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.LocalTaskWaitTime])) testSummary = append(testSummary, fmt.Sprintf("Tasks generated: %d", generatedTasksCounter)) testSummary = append(testSummary, fmt.Sprintf("Tasks polled: %d", polledTasksCounter)) totalPollCnt := 0 @@ -256,28 +280,32 @@ func (s *MatchingSimulationSuite) generate( matchingClient MatchingClient, domainID, tasklist string, maxTasksToGenerate int, - taskGenerateInterval time.Duration, + rateLimiter *rate.Limiter, generatedTasksCounter *int32, lastTaskScheduleID *int32, - wg *sync.WaitGroup) { + wg *sync.WaitGroup, + tasksToGenerate *sync.WaitGroup) { defer wg.Done() - t := time.NewTicker(taskGenerateInterval) - defer t.Stop() - for { select { case <-ctx.Done(): s.log("Generator done") return - case <-t.C: + default: + if err := rateLimiter.Wait(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + s.T().Fatal("Rate limiter failed: ", err) + } + return + } scheduleID := int(atomic.AddInt32(lastTaskScheduleID, 1)) if scheduleID > maxTasksToGenerate { s.log("Generated %d tasks so generator will stop", maxTasksToGenerate) return } decisionTask := newDecisionTask(domainID, tasklist, scheduleID) - reqCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second) err := matchingClient.AddDecisionTask(reqCtx, decisionTask) cancel() if err != nil { @@ -287,6 +315,7 @@ func (s *MatchingSimulationSuite) generate( s.log("Decision task %d added", scheduleID) atomic.AddInt32(generatedTasksCounter, 1) + tasksToGenerate.Done() } } } @@ -299,6 +328,7 @@ func (s *MatchingSimulationSuite) poll( wg *sync.WaitGroup, pollDuration time.Duration, statsCh chan *operationStats, + tasksToReceive *sync.WaitGroup, ) { defer wg.Done() t := time.NewTicker(50 * time.Millisecond) @@ -344,6 +374,7 @@ func (s *MatchingSimulationSuite) poll( atomic.AddInt32(polledTasksCounter, 1) s.log("PollForDecisionTask got a task with startedid: %d. resp: %+v", resp.StartedEventID, resp) + tasksToReceive.Done() } } } @@ -470,3 +501,10 @@ func getForwarderMaxChildPerNode(i int) int { } return i } + +func getTaskQPS(i int) int { + if i == 0 { + return 40 + } + return i +} diff --git a/host/onebox.go b/host/onebox.go index 2b8317057d7..1893b91f54a 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -166,8 +166,8 @@ type ( // Number of task generators defaults to 1 NumTaskGenerators int - // Each generator will produce a new task every TaskGeneratorTickInterval. Defaults to 50ms - TaskGeneratorTickInterval time.Duration + // The total QPS to generate tasks. Defaults to 40. + TasksPerSecond int // Upper limit of tasks to generate. Task generators will stop if total number of tasks generated reaches MaxTaskToGenerate during simulation // Defaults to 2k @@ -187,6 +187,12 @@ type ( // Children per node. defaults to 20 ForwarderMaxChildrenPerNode int + + // LocalPollWaitTime. defaults to 0ms. + LocalPollWaitTime time.Duration + + // LocalTaskWaitTime. defaults to 0ms. + LocalTaskWaitTime time.Duration } // CadenceParams contains everything needed to bootstrap Cadence diff --git a/host/testdata/matching_simulation.yaml b/host/testdata/matching_simulation_burst.yaml similarity index 83% rename from host/testdata/matching_simulation.yaml rename to host/testdata/matching_simulation_burst.yaml index 4a0da782f2c..9304ea54eb5 100644 --- a/host/testdata/matching_simulation.yaml +++ b/host/testdata/matching_simulation_burst.yaml @@ -12,10 +12,10 @@ matchingconfig: tasklistreadpartitions: 2 numpollers: 10 numtaskgenerators: 2 - taskgeneratortickinterval: 50ms + taskspersecond: 200 maxtasktogenerate: 1500 - polltimeout: 5s - forwardermaxoutstandingpolls: 20 + polltimeout: 60s + forwardermaxoutstandingpolls: 1 forwardermaxoutstandingtasks: 1 forwardermaxratepersecond: 10 forwardermaxchildrenpernode: 20 diff --git a/host/testdata/matching_simulation_default.yaml b/host/testdata/matching_simulation_default.yaml new file mode 100644 index 00000000000..099cd92c408 --- /dev/null +++ b/host/testdata/matching_simulation_default.yaml @@ -0,0 +1,25 @@ +enablearchival: false +clusterno: 1 +messagingclientconfig: + usemock: true +historyconfig: + numhistoryshards: 4 + numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 4 + simulationconfig: + tasklistwritepartitions: 2 + tasklistreadpartitions: 2 + numpollers: 10 + numtaskgenerators: 2 + taskspersecond: 40 + maxtasktogenerate: 1500 + polltimeout: 60s + forwardermaxoutstandingpolls: 1 + forwardermaxoutstandingtasks: 1 + forwardermaxratepersecond: 10 + forwardermaxchildrenpernode: 20 + localpollwaittime: 0ms + localtaskwaittime: 0ms +workerconfig: + enableasyncwfconsumer: false diff --git a/host/testdata/matching_simulation_more_read_partitions.yaml b/host/testdata/matching_simulation_more_read_partitions.yaml new file mode 100644 index 00000000000..94e4edc6054 --- /dev/null +++ b/host/testdata/matching_simulation_more_read_partitions.yaml @@ -0,0 +1,25 @@ +enablearchival: false +clusterno: 1 +messagingclientconfig: + usemock: true +historyconfig: + numhistoryshards: 4 + numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 4 + simulationconfig: + tasklistwritepartitions: 2 + tasklistreadpartitions: 4 + numpollers: 10 + numtaskgenerators: 2 + taskspersecond: 40 + maxtasktogenerate: 1500 + polltimeout: 60s + forwardermaxoutstandingpolls: 1 + forwardermaxoutstandingtasks: 1 + forwardermaxratepersecond: 10 + forwardermaxchildrenpernode: 20 + localpollwaittime: 0ms + localtaskwaittime: 0ms +workerconfig: + enableasyncwfconsumer: false diff --git a/scripts/run_matching_simulator.sh b/scripts/run_matching_simulator.sh index a3af1ee2f19..c281ddd433a 100755 --- a/scripts/run_matching_simulator.sh +++ b/scripts/run_matching_simulator.sh @@ -5,7 +5,9 @@ set -eo pipefail -testName="test-$(date '+%Y-%m-%d-%H-%M-%S')" +testCase="${1:-default}" +testCfg="testdata/matching_simulation_$testCase.yaml" +testName="test-$testCase-$(date '+%Y-%m-%d-%H-%M-%S')" resultFolder="matching-simulator-output" mkdir -p "$resultFolder" eventLogsFile="$resultFolder/events.json" @@ -16,10 +18,10 @@ echo "Building test image" docker-compose -f docker/buildkite/docker-compose-local-matching-simulation.yml \ build matching-simulator -echo "Running the test" +echo "Running the test $testCase" docker-compose \ -f docker/buildkite/docker-compose-local-matching-simulation.yml \ - run --rm matching-simulator \ + run -e MATCHING_SIMULATION_CONFIG=$testCfg --rm matching-simulator \ | grep -a --line-buffered "Matching New Event" \ | sed "s/Matching New Event: //" \ | jq . > "$eventLogsFile" diff --git a/service/history/engine/engineimpl/history_engine2_test.go b/service/history/engine/engineimpl/history_engine2_test.go index 6aab47487f6..16723ad2cf0 100644 --- a/service/history/engine/engineimpl/history_engine2_test.go +++ b/service/history/engine/engineimpl/history_engine2_test.go @@ -802,6 +802,180 @@ func (s *engine2Suite) TestRecordActivityTaskStartedResurrected() { s.Equal(err, workflow.ErrActivityTaskNotFound) } +func (s *engine2Suite) TestRecordActivityTaskStartedStaleState() { + domainID := constants.TestDomainID + workflowExecution := types.WorkflowExecution{ + WorkflowID: "wId", + RunID: constants.TestRunID, + } + + identity := "testIdentity" + tl := "testTaskList" + + msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true) + + ms1 := execution.CreatePersistenceMutableState(msBuilder) + gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse1, nil).Times(workflow.ConditionalRetryCount) + + response, err := s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 5, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + + s.Error(err) + s.Nil(response) + s.Equal(workflow.ErrMaxAttemptsExceeded, err) +} + +func (s *engine2Suite) TestRecordActivityTaskStartedActivityNotPending() { + domainID := constants.TestDomainID + workflowExecution := types.WorkflowExecution{ + WorkflowID: "wId", + RunID: constants.TestRunID, + } + + identity := "testIdentity" + tl := "testTaskList" + + msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true) + + ms1 := execution.CreatePersistenceMutableState(msBuilder) + gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse1, nil).Once() + + response, err := s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 3, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + + s.Error(err) + s.Nil(response) + s.Equal(workflow.ErrActivityTaskNotFound, err) +} + +func (s *engine2Suite) TestRecordActivityTaskStartedActivityAlreadyStarted() { + domainID := constants.TestDomainID + workflowExecution := types.WorkflowExecution{ + WorkflowID: "wId", + RunID: constants.TestRunID, + } + + identity := "testIdentity" + tl := "testTaskList" + + activityID := "activity1_id" + activityType := "activity_type1" + activityInput := []byte("input1") + + msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true) + decisionCompletedEvent := test.AddDecisionTaskCompletedEvent(msBuilder, int64(2), int64(3), nil, identity) + scheduledEvent, _ := test.AddActivityTaskScheduledEvent(msBuilder, decisionCompletedEvent.ID, activityID, + activityType, tl, activityInput, 100, 10, 1, 5) + + ms1 := execution.CreatePersistenceMutableState(msBuilder) + gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse1, nil).Once() + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{ + MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}, + }, nil).Once() + + s.mockEventsCache.EXPECT().GetEvent( + gomock.Any(), gomock.Any(), domainID, workflowExecution.GetWorkflowID(), workflowExecution.GetRunID(), + decisionCompletedEvent.ID, scheduledEvent.ID, gomock.Any(), + ).Return(scheduledEvent, nil).Times(1) + + // start activity + response, err := s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 5, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + s.Nil(err) + s.NotNil(response) + s.Equal(scheduledEvent, response.ScheduledEvent) + + // another request made with the same scheduleID and same requestID + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{ + MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}, + }, nil).Once() + + s.mockEventsCache.EXPECT().GetEvent( + gomock.Any(), gomock.Any(), domainID, workflowExecution.GetWorkflowID(), workflowExecution.GetRunID(), + decisionCompletedEvent.ID, scheduledEvent.ID, gomock.Any(), + ).Return(scheduledEvent, nil).Times(1) + + response, err = s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 5, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + s.Nil(err) + s.NotNil(response) + s.Equal(scheduledEvent, response.ScheduledEvent) + + // another request made with the same scheduleID and different requestID + s.mockEventsCache.EXPECT().GetEvent( + gomock.Any(), gomock.Any(), domainID, workflowExecution.GetWorkflowID(), workflowExecution.GetRunID(), + decisionCompletedEvent.ID, scheduledEvent.ID, gomock.Any(), + ).Return(scheduledEvent, nil).Times(1) + + response, err = s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 5, + TaskID: 100, + RequestID: "otherReqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + s.Error(err) + s.Nil(response) + s.Equal(&types.EventAlreadyStartedError{Message: "Activity task already started."}, err) +} + func (s *engine2Suite) TestRequestCancelWorkflowExecutionSuccess() { domainID := constants.TestDomainID workflowExecution := types.WorkflowExecution{ diff --git a/service/matching/config/config.go b/service/matching/config/config.go index cc14d869176..a4c9b20abcd 100644 --- a/service/matching/config/config.go +++ b/service/matching/config/config.go @@ -52,6 +52,7 @@ type ( ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskListInfoFilters AsyncTaskDispatchTimeout dynamicconfig.DurationPropertyFnWithTaskListInfoFilters LocalPollWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + LocalTaskWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters // Time to hold a poll request before returning an empty response if there are no tasks LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters @@ -80,6 +81,8 @@ type ( TaskDispatchRPSTTL time.Duration // task gc configuration MaxTimeBetweenTaskDeletes time.Duration + + EnableTasklistOwnershipGuard dynamicconfig.BoolPropertyFn } ForwarderConfig struct { @@ -104,6 +107,7 @@ type ( MaxTaskDeleteBatchSize func() int AsyncTaskDispatchTimeout func() time.Duration LocalPollWaitTime func() time.Duration + LocalTaskWaitTime func() time.Duration // taskWriter configuration OutstandingTaskAppendsThreshold func() int MaxTaskBatchSize func() int @@ -156,7 +160,9 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { EnableTasklistIsolation: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation), AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()), AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout), + EnableTasklistOwnershipGuard: dc.GetBoolProperty(dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss), LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime), + LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime), HostName: hostName, TaskDispatchRPS: 100000.0, TaskDispatchRPSTTL: time.Minute, diff --git a/service/matching/config/config_test.go b/service/matching/config/config_test.go index 1c7738518ff..9924a534bec 100644 --- a/service/matching/config/config_test.go +++ b/service/matching/config/config_test.go @@ -74,10 +74,12 @@ func TestNewConfig(t *testing.T) { "AllIsolationGroups": {dynamicconfig.AllIsolationGroups, []interface{}{"a", "b", "c"}}, "AsyncTaskDispatchTimeout": {dynamicconfig.AsyncTaskDispatchTimeout, time.Duration(25)}, "LocalPollWaitTime": {dynamicconfig.LocalPollWaitTime, time.Duration(10)}, + "LocalTaskWaitTime": {dynamicconfig.LocalTaskWaitTime, time.Duration(10)}, "HostName": {nil, hostname}, "TaskDispatchRPS": {nil, 100000.0}, "TaskDispatchRPSTTL": {nil, time.Minute}, "MaxTimeBetweenTaskDeletes": {nil, time.Second}, + "EnableTasklistOwnershipGuard": {dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss, false}, } client := dynamicconfig.NewInMemoryClient() for fieldName, expected := range fields { diff --git a/service/matching/handler/engine.go b/service/matching/handler/engine.go index a66cc5352fa..d685ae6189e 100644 --- a/service/matching/handler/engine.go +++ b/service/matching/handler/engine.go @@ -78,6 +78,8 @@ type ( } matchingEngineImpl struct { + shutdownCompletion *sync.WaitGroup + shutdown chan struct{} taskManager persistence.TaskManager clusterMetadata cluster.Metadata historyService history.Client @@ -120,7 +122,8 @@ var ( var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented // NewEngine creates an instance of matching engine -func NewEngine(taskManager persistence.TaskManager, +func NewEngine( + taskManager persistence.TaskManager, clusterMetadata cluster.Metadata, historyService history.Client, matchingClient matching.Client, @@ -132,7 +135,10 @@ func NewEngine(taskManager persistence.TaskManager, partitioner partition.Partitioner, timeSource clock.TimeSource, ) Engine { + e := &matchingEngineImpl{ + shutdown: make(chan struct{}), + shutdownCompletion: &sync.WaitGroup{}, taskManager: taskManager, clusterMetadata: clusterMetadata, historyService: historyService, @@ -149,19 +155,24 @@ func NewEngine(taskManager persistence.TaskManager, partitioner: partitioner, timeSource: timeSource, } + + e.shutdownCompletion.Add(1) + go e.subscribeToMembershipChanges() + e.waitForQueryResultFn = e.waitForQueryResult return e } func (e *matchingEngineImpl) Start() { - // As task lists are initialized lazily nothing is done on startup at this point. } func (e *matchingEngineImpl) Stop() { + close(e.shutdown) // Executes Stop() on each task list outside of lock for _, l := range e.getTaskLists(math.MaxInt32) { l.Stop() } + e.shutdownCompletion.Wait() } func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager { @@ -200,26 +211,9 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t } e.taskListsLock.RUnlock() - // Defensive check to make sure we actually own the task list - // If we try to create a task list manager for a task list that is not owned by us, return an error - // The new task list manager will steal the task list from the current owner, which should only happen if - // the task list is owned by the current host. - taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName()) + err := e.errIfShardLoss(taskList) if err != nil { - return nil, fmt.Errorf("failed to lookup task list owner: %w", err) - } - - self, err := e.membershipResolver.WhoAmI() - if err != nil { - return nil, fmt.Errorf("failed to lookup self im membership: %w", err) - } - - if taskListOwner.Identity() != self.Identity() { - return nil, cadence_errors.NewTaskListNotOwnnedByHostError( - taskListOwner.Identity(), - self.Identity(), - taskList.GetName(), - ) + return nil, err } // If it gets here, write lock and check again in case a task list is created between the two locks @@ -1202,6 +1196,64 @@ func (e *matchingEngineImpl) emitInfoOrDebugLog( } } +func (e *matchingEngineImpl) errIfShardLoss(taskList *tasklist.Identifier) error { + if !e.config.EnableTasklistOwnershipGuard() { + return nil + } + + self, err := e.membershipResolver.WhoAmI() + if err != nil { + return fmt.Errorf("failed to lookup self im membership: %w", err) + } + + if e.isShuttingDown() { + e.logger.Warn("request to get tasklist is being rejected because engine is shutting down", + tag.WorkflowDomainID(taskList.GetDomainID()), + tag.WorkflowTaskListType(taskList.GetType()), + tag.WorkflowTaskListName(taskList.GetName()), + ) + + return cadence_errors.NewTaskListNotOwnedByHostError( + "not known", + self.Identity(), + taskList.GetName(), + ) + } + + // Defensive check to make sure we actually own the task list + // If we try to create a task list manager for a task list that is not owned by us, return an error + // The new task list manager will steal the task list from the current owner, which should only happen if + // the task list is owned by the current host. + taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName()) + if err != nil { + return fmt.Errorf("failed to lookup task list owner: %w", err) + } + + if taskListOwner.Identity() != self.Identity() { + e.logger.Warn("Request to get tasklist is being rejected because engine does not own this shard", + tag.WorkflowDomainID(taskList.GetDomainID()), + tag.WorkflowTaskListType(taskList.GetType()), + tag.WorkflowTaskListName(taskList.GetName()), + ) + return cadence_errors.NewTaskListNotOwnedByHostError( + taskListOwner.Identity(), + self.Identity(), + taskList.GetName(), + ) + } + + return nil +} + +func (e *matchingEngineImpl) isShuttingDown() bool { + select { + case <-e.shutdown: + return true + default: + return false + } +} + func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) { m.Lock() defer m.Unlock() diff --git a/service/matching/handler/engine_integration_test.go b/service/matching/handler/engine_integration_test.go index a3e613c60e8..73c3201a1e9 100644 --- a/service/matching/handler/engine_integration_test.go +++ b/service/matching/handler/engine_integration_test.go @@ -34,7 +34,6 @@ import ( "github.com/golang/mock/gomock" "github.com/pborman/uuid" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" "go.uber.org/yarpc" @@ -45,7 +44,6 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/dynamicconfig" - cadence_errors "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -55,6 +53,7 @@ import ( "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/partition" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" "github.com/uber/cadence/service/matching/tasklist" @@ -131,6 +130,7 @@ func (s *matchingEngineSuite) SetupTest() { s.mockMembershipResolver = membership.NewMockResolver(s.controller) s.mockMembershipResolver.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(membership.HostInfo{}, nil).AnyTimes() s.mockMembershipResolver.EXPECT().WhoAmI().Return(membership.HostInfo{}, nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).AnyTimes() s.mockIsolationStore = dynamicconfig.NewMockClient(s.controller) dcClient := dynamicconfig.NewInMemoryClient() dcClient.UpdateValue(dynamicconfig.EnableTasklistIsolation, true) @@ -1303,58 +1303,6 @@ func (s *matchingEngineSuite) TestConfigDefaultHostName() { s.EqualValues(configEmpty.HostName, "") } -func (s *matchingEngineSuite) TestGetTaskListManager_OwnerShip() { - testCases := []struct { - name string - lookUpResult string - lookUpErr error - whoAmIResult string - whoAmIErr error - - expectedError error - }{ - { - name: "Not owned by current host", - lookUpResult: "A", - whoAmIResult: "B", - expectedError: new(cadence_errors.TaskListNotOwnnedByHostError), - }, - { - name: "LookupError", - lookUpErr: assert.AnError, - expectedError: assert.AnError, - }, - { - name: "WhoAmIError", - whoAmIErr: assert.AnError, - expectedError: assert.AnError, - }, - } - - for _, tc := range testCases { - s.T().Run(tc.name, func(t *testing.T) { - resolverMock := membership.NewMockResolver(s.controller) - s.matchingEngine.membershipResolver = resolverMock - - resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return( - membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr, - ).AnyTimes() - resolverMock.EXPECT().WhoAmI().Return( - membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr, - ).AnyTimes() - - taskListKind := types.TaskListKindNormal - - _, err := s.matchingEngine.getTaskListManager( - tasklist.NewTestTaskListID(s.T(), "domain", "tasklist", persistence.TaskListTypeActivity), - &taskListKind, - ) - - assert.ErrorAs(s.T(), err, &tc.expectedError) - }) - } -} - func newActivityTaskScheduledEvent(eventID int64, decisionTaskCompletedEventID int64, scheduleAttributes *types.ScheduleActivityTaskDecisionAttributes) *types.HistoryEvent { historyEvent := newHistoryEvent(eventID, types.EventTypeActivityTaskScheduled) @@ -1402,6 +1350,7 @@ func defaultTestConfig() *config.Config { config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10) config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) config.MaxTimeBetweenTaskDeletes = time.Duration(0) + config.EnableTasklistOwnershipGuard = func(opts ...dynamicconfig.FilterOption) bool { return true } return config } diff --git a/service/matching/handler/engine_test.go b/service/matching/handler/engine_test.go index 87c84088362..66004d08c84 100644 --- a/service/matching/handler/engine_test.go +++ b/service/matching/handler/engine_test.go @@ -26,6 +26,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "github.com/golang/mock/gomock" @@ -36,8 +37,10 @@ import ( "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" "github.com/uber/cadence/service/matching/tasklist" @@ -617,3 +620,108 @@ func TestWaitForQueryResult(t *testing.T) { }) } } + +func TestIsShuttingDown(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(0) + e := matchingEngineImpl{ + shutdownCompletion: &wg, + shutdown: make(chan struct{}), + } + e.Start() + assert.False(t, e.isShuttingDown()) + e.Stop() + assert.True(t, e.isShuttingDown()) +} + +func TestGetTasklistsNotOwned(t *testing.T) { + + ctrl := gomock.NewController(t) + resolver := membership.NewMockResolver(ctrl) + + resolver.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("self", "host123", nil), nil) + + tl1, _ := tasklist.NewIdentifier("", "tl1", 0) + tl2, _ := tasklist.NewIdentifier("", "tl2", 0) + tl3, _ := tasklist.NewIdentifier("", "tl3", 0) + + tl1m := tasklist.NewMockManager(ctrl) + tl2m := tasklist.NewMockManager(ctrl) + tl3m := tasklist.NewMockManager(ctrl) + + resolver.EXPECT().Lookup(service.Matching, tl1.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl2.GetName()).Return(membership.NewDetailedHostInfo("", "host456", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl3.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + + e := matchingEngineImpl{ + shutdown: make(chan struct{}), + membershipResolver: resolver, + taskListsLock: sync.RWMutex{}, + taskLists: map[tasklist.Identifier]tasklist.Manager{ + *tl1: tl1m, + *tl2: tl2m, + *tl3: tl3m, + }, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + logger: loggerimpl.NewNopLogger(), + } + + tls, err := e.getNonOwnedTasklistsLocked() + assert.NoError(t, err) + + assert.Equal(t, []tasklist.Manager{tl2m}, tls) +} + +func TestShutDownTasklistsNotOwned(t *testing.T) { + + ctrl := gomock.NewController(t) + resolver := membership.NewMockResolver(ctrl) + + resolver.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("self", "host123", nil), nil) + + tl1, _ := tasklist.NewIdentifier("", "tl1", 0) + tl2, _ := tasklist.NewIdentifier("", "tl2", 0) + tl3, _ := tasklist.NewIdentifier("", "tl3", 0) + + tl1m := tasklist.NewMockManager(ctrl) + tl2m := tasklist.NewMockManager(ctrl) + tl3m := tasklist.NewMockManager(ctrl) + + resolver.EXPECT().Lookup(service.Matching, tl1.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl2.GetName()).Return(membership.NewDetailedHostInfo("", "host456", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl3.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + + e := matchingEngineImpl{ + shutdown: make(chan struct{}), + membershipResolver: resolver, + taskListsLock: sync.RWMutex{}, + taskLists: map[tasklist.Identifier]tasklist.Manager{ + *tl1: tl1m, + *tl2: tl2m, + *tl3: tl3m, + }, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + metricsClient: metrics.NewNoopMetricsClient(), + logger: loggerimpl.NewNopLogger(), + } + + wg := sync.WaitGroup{} + + wg.Add(1) + + tl2m.EXPECT().TaskListID().Return(tl2).AnyTimes() + tl2m.EXPECT().String().AnyTimes() + + tl2m.EXPECT().Stop().Do(func() { + wg.Done() + }) + + err := e.shutDownNonOwnedTasklists() + wg.Wait() + + assert.NoError(t, err) +} diff --git a/service/matching/handler/interfaces.go b/service/matching/handler/interfaces.go index 4c82fa334b3..21f39f160e0 100644 --- a/service/matching/handler/interfaces.go +++ b/service/matching/handler/interfaces.go @@ -34,7 +34,8 @@ import ( type ( // Engine exposes interfaces for clients to poll for activity and decision tasks. Engine interface { - Stop() + common.Daemon + AddDecisionTask(hCtx *handlerContext, request *types.AddDecisionTaskRequest) (syncMatch bool, err error) AddActivityTask(hCtx *handlerContext, request *types.AddActivityTaskRequest) (syncMatch bool, err error) PollForDecisionTask(hCtx *handlerContext, request *types.MatchingPollForDecisionTaskRequest) (*types.MatchingPollForDecisionTaskResponse, error) diff --git a/service/matching/handler/interfaces_mock.go b/service/matching/handler/interfaces_mock.go index e94141e872f..78401a46838 100644 --- a/service/matching/handler/interfaces_mock.go +++ b/service/matching/handler/interfaces_mock.go @@ -206,6 +206,18 @@ func (mr *MockEngineMockRecorder) RespondQueryTaskCompleted(hCtx, request interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RespondQueryTaskCompleted", reflect.TypeOf((*MockEngine)(nil).RespondQueryTaskCompleted), hCtx, request) } +// Start mocks base method. +func (m *MockEngine) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockEngineMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockEngine)(nil).Start)) +} + // Stop mocks base method. func (m *MockEngine) Stop() { m.ctrl.T.Helper() diff --git a/service/matching/handler/membership.go b/service/matching/handler/membership.go new file mode 100644 index 00000000000..88964d19dd7 --- /dev/null +++ b/service/matching/handler/membership.go @@ -0,0 +1,150 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package handler + +import ( + "fmt" + "sync" + + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/service/matching/tasklist" +) + +const subscriptionBufferSize = 1000 + +// Because there's a bunch of conditions under which matching may be holding a tasklist +// reader daemon and other live procesess but when it doesn't (according to the rest of the hashring) +// own the tasklist anymore, this listener watches for membership changes and purges anything disused +// in the hashring on membership changes. +// +// Combined with the guard on tasklist instantiation, it should prevent incorrect or poorly timed +// creating of tasklist ownership and database shard thrashing between hosts while they figure out +// which host is the real owner of the tasklist. +// +// This is not the main shutdown process, its just an optimization. +func (e *matchingEngineImpl) subscribeToMembershipChanges() { + defer func() { + if r := recover(); r != nil { + e.logger.Error("matching membership watcher changes caused a panic, recovering", tag.Dynamic("recovered-panic", r)) + } + }() + + defer e.shutdownCompletion.Done() + + if !e.config.EnableTasklistOwnershipGuard() { + return + } + + listener := make(chan *membership.ChangedEvent, subscriptionBufferSize) + e.membershipResolver.Subscribe(service.Matching, "matching-engine", listener) + + for { + select { + case event := <-listener: + err := e.shutDownNonOwnedTasklists() + if err != nil { + e.logger.Error("Error while trying to determine if tasklists have been shutdown", + tag.Error(err), + tag.MembershipChangeEvent(event), + ) + } + case <-e.shutdown: + return + } + } +} + +func (e *matchingEngineImpl) shutDownNonOwnedTasklists() error { + if !e.config.EnableTasklistOwnershipGuard() { + return nil + } + noLongerOwned, err := e.getNonOwnedTasklistsLocked() + if err != nil { + return err + } + + tasklistsShutdownWG := sync.WaitGroup{} + + for _, tl := range noLongerOwned { + // for each of the tasklists that are no longer owned, kick off the + // process of stopping them. The stopping process is IO heavy and + // can take a while, so do them in parallel to efficiently unload tasklists not owned + tasklistsShutdownWG.Add(1) + go func(tl tasklist.Manager) { + + defer func() { + if r := recover(); r != nil { + e.logger.Error("panic occurred while trying to shut down tasklist", tag.Dynamic("recovered-panic", r)) + } + }() + defer tasklistsShutdownWG.Done() + + e.logger.Info("shutting down tasklist preemptively because they are no longer owned by this host", + tag.WorkflowTaskListType(tl.TaskListID().GetType()), + tag.WorkflowTaskListName(tl.TaskListID().GetName()), + tag.WorkflowDomainID(tl.TaskListID().GetDomainID()), + tag.Dynamic("tasklist-debug-info", tl.String()), + ) + + e.unloadTaskList(tl) + }(tl) + } + + tasklistsShutdownWG.Wait() + + return nil +} + +func (e *matchingEngineImpl) getNonOwnedTasklistsLocked() ([]tasklist.Manager, error) { + if !e.config.EnableTasklistOwnershipGuard() { + return nil, nil + } + + var toShutDown []tasklist.Manager + + e.taskListsLock.RLock() + defer e.taskListsLock.RUnlock() + + self, err := e.membershipResolver.WhoAmI() + if err != nil { + return nil, fmt.Errorf("failed to lookup self im membership: %w", err) + } + + for tl, manager := range e.taskLists { + taskListOwner, err := e.membershipResolver.Lookup(service.Matching, tl.GetName()) + if err != nil { + return nil, fmt.Errorf("failed to lookup task list owner: %w", err) + } + + if taskListOwner.Identity() != self.Identity() { + toShutDown = append(toShutDown, manager) + } + } + + e.logger.Info("Got list of non-owned-tasklists", + tag.Dynamic("tasklist-debug-info", toShutDown), + ) + return toShutDown, nil +} diff --git a/service/matching/handler/membership_test.go b/service/matching/handler/membership_test.go new file mode 100644 index 00000000000..2fcdf50d5d8 --- /dev/null +++ b/service/matching/handler/membership_test.go @@ -0,0 +1,307 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package handler + +import ( + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + + "github.com/uber/cadence/client/history" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/dynamicconfig" + cadence_errors "github.com/uber/cadence/common/errors" + "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/matching/config" + "github.com/uber/cadence/service/matching/tasklist" +) + +func TestGetTaskListManager_OwnerShip(t *testing.T) { + + testCases := []struct { + name string + lookUpResult string + lookUpErr error + whoAmIResult string + whoAmIErr error + tasklistGuardEnabled bool + + expectedError error + }{ + { + name: "Not owned by current host", + lookUpResult: "A", + whoAmIResult: "B", + tasklistGuardEnabled: true, + + expectedError: new(cadence_errors.TaskListNotOwnedByHostError), + }, + { + name: "LookupError", + lookUpErr: assert.AnError, + tasklistGuardEnabled: true, + expectedError: assert.AnError, + }, + { + name: "WhoAmIError", + whoAmIErr: assert.AnError, + tasklistGuardEnabled: true, + expectedError: assert.AnError, + }, + { + name: "when feature is not enabled, expect previous behaviour to continue", + lookUpResult: "A", + whoAmIResult: "B", + tasklistGuardEnabled: false, + + expectedError: nil, + }, + } + + for _, tc := range testCases { + + t.Run(tc.name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + logger := loggerimpl.NewNopLogger() + + mockTimeSource := clock.NewMockedTimeSourceAt(time.Now()) + taskManager := tasklist.NewTestTaskManager(t, logger, mockTimeSource) + mockHistoryClient := history.NewMockClient(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + resolverMock := membership.NewMockResolver(ctrl) + resolverMock.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).AnyTimes() + + // this is only if the call goes through + mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes() + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes() + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(matchingTestDomainName, nil).AnyTimes() + + config := defaultTestConfig() + taskListEnabled := tc.tasklistGuardEnabled + config.EnableTasklistOwnershipGuard = func(opts ...dynamicconfig.FilterOption) bool { + return taskListEnabled + } + + matchingEngine := NewEngine( + taskManager, + cluster.GetTestClusterMetadata(true), + mockHistoryClient, + nil, + config, + logger, + metrics.NewClient(tally.NoopScope, metrics.Matching), + mockDomainCache, + resolverMock, + nil, + mockTimeSource, + ).(*matchingEngineImpl) + + resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return( + membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr, + ).AnyTimes() + resolverMock.EXPECT().WhoAmI().Return( + membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr, + ).AnyTimes() + + taskListKind := types.TaskListKindNormal + + _, err := matchingEngine.getTaskListManager( + tasklist.NewTestTaskListID(t, "domain", "tasklist", persistence.TaskListTypeActivity), + &taskListKind, + ) + if tc.expectedError != nil { + assert.ErrorAs(t, err, &tc.expectedError) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMembershipSubscriptionShutdown(t *testing.T) { + assert.NotPanics(t, func() { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Times(1) + + e := matchingEngineImpl{ + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + go func() { + time.Sleep(time.Second) + close(e.shutdown) + }() + e.subscribeToMembershipChanges() + }) +} + +func TestMembershipSubscriptionPanicHandling(t *testing.T) { + assert.NotPanics(t, func() { + ctrl := gomock.NewController(t) + + r := resource.NewTest(t, ctrl, 0) + r.MembershipResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).DoAndReturn(func(_, _, _ any) { + panic("a panic has occurred") + }) + + e := matchingEngineImpl{ + membershipResolver: r.MembershipResolver, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + logger: loggerimpl.NewNopLogger(), + shutdown: make(chan struct{}), + } + + e.subscribeToMembershipChanges() + }) +} + +func TestSubscriptionAndShutdown(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + shutdownWG := &sync.WaitGroup{} + shutdownWG.Add(1) + + e := matchingEngineImpl{ + shutdownCompletion: shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // anytimes here because this is quite a racy test and the actual assertions for the unsubscription logic will be separated out + m.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("host2", "host2", nil), nil).AnyTimes() + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do( + func(service string, name string, inc chan<- *membership.ChangedEvent) { + m := membership.ChangedEvent{ + HostsAdded: nil, + HostsUpdated: nil, + HostsRemoved: []string{"host123"}, + } + inc <- &m + }) + + go func() { + // then call stop so the test can finish + time.Sleep(time.Second) + e.Stop() + }() + + e.subscribeToMembershipChanges() +} + +func TestSubscriptionAndErrorReturned(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + shutdownWG := sync.WaitGroup{} + shutdownWG.Add(1) + + e := matchingEngineImpl{ + shutdownCompletion: &shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // this should trigger the error case on a membership event + m.EXPECT().WhoAmI().Return(membership.HostInfo{}, assert.AnError).AnyTimes() + + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do( + func(service string, name string, inc chan<- *membership.ChangedEvent) { + m := membership.ChangedEvent{ + HostsAdded: nil, + HostsUpdated: nil, + HostsRemoved: []string{"host123"}, + } + inc <- &m + }) + + go func() { + // then call stop so the test can finish + time.Sleep(time.Second) + e.Stop() + }() + + e.subscribeToMembershipChanges() +} + +func TestGetTasklistManagerShutdownScenario(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + self := membership.NewDetailedHostInfo("self", "self", nil) + + m.EXPECT().WhoAmI().Return(self, nil).AnyTimes() + + shutdownWG := sync.WaitGroup{} + shutdownWG.Add(0) + + e := matchingEngineImpl{ + shutdownCompletion: &shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // set this engine to be shutting down so as to trigger the tasklistGetTasklistByID guard + e.Stop() + + tl, _ := tasklist.NewIdentifier("domainid", "tl", 0) + kind := types.TaskListKindNormal + res, err := e.getTaskListManager(tl, &kind) + assertErr := &cadence_errors.TaskListNotOwnedByHostError{} + assert.ErrorAs(t, err, &assertErr) + assert.Nil(t, res) +} diff --git a/service/matching/tasklist/matcher.go b/service/matching/tasklist/matcher.go index c8af219ce5b..14b7250834e 100644 --- a/service/matching/tasklist/matcher.go +++ b/service/matching/tasklist/matcher.go @@ -136,10 +136,36 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err return false, err } } - - // TODO: Pollers are aggressively forwarded to parent partitions so it's unlikely - // to have a poller available to pick up the task below for sub-partitions. - // Try adding some wait here. + e := event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + } + localWaitTime := tm.config.LocalTaskWaitTime() + if localWaitTime > 0 { + childCtx, cancel := context.WithTimeout(ctx, localWaitTime) + select { + case tm.getTaskC(task) <- task: // poller picked up the task + cancel() + if task.ResponseC != nil { + // if there is a response channel, block until resp is received + // and return error if the response contains error + err := <-task.ResponseC + tm.scope.RecordTimer(metrics.SyncMatchLocalPollLatencyPerTaskList, time.Since(startT)) + if err == nil { + e.EventName = "Offer task due to local wait" + e.Payload = map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + } + event.Log(e) + } + return true, err + } + return false, nil + case <-childCtx.Done(): + cancel() + } + } select { case tm.getTaskC(task) <- task: // poller picked up the task if task.ResponseC != nil { @@ -155,12 +181,8 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err // root partition if possible select { case token := <-tm.fwdrAddReqTokenC(): - event.Log(event.E{ - TaskListName: tm.tasklist.GetName(), - TaskListType: tm.tasklist.GetType(), - TaskListKind: tm.tasklistKind.Ptr(), - EventName: "Attempting to Forward Task", - }) + e.EventName = "Attempting to Forward Task" + event.Log(e) err := tm.fwdr.ForwardTask(ctx, task) token.release("") if err == nil { @@ -259,18 +281,23 @@ func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error // attempt a match with local poller first. When that // doesn't succeed, try both local match and remote match taskC := tm.getTaskC(task) + localWaitTime := tm.config.LocalTaskWaitTime() + childCtx, cancel := context.WithTimeout(ctx, localWaitTime) select { case taskC <- task: // poller picked up the task + cancel() tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList) tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT)) e.EventName = "Dispatched to Local Poller" event.Log(e) return nil case <-ctx.Done(): + cancel() e.EventName = "Context Done While Dispatching to Local Poller" event.Log(e) return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err()) - default: + case <-childCtx.Done(): + cancel() } attempt := 0 diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index c663d1504b1..27fa626ebaf 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -724,6 +724,9 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c LocalPollWaitTime: func() time.Duration { return cfg.LocalPollWaitTime(domainName, taskListName, taskType) }, + LocalTaskWaitTime: func() time.Duration { + return cfg.LocalTaskWaitTime(domainName, taskListName, taskType) + }, ForwarderConfig: config.ForwarderConfig{ ForwarderMaxOutstandingPolls: func() int { return cfg.ForwarderMaxOutstandingPolls(domainName, taskListName, taskType) diff --git a/service/matching/tasklist/task_list_manager_test.go b/service/matching/tasklist/task_list_manager_test.go index 187f827f257..f35de5e4ef9 100644 --- a/service/matching/tasklist/task_list_manager_test.go +++ b/service/matching/tasklist/task_list_manager_test.go @@ -57,6 +57,7 @@ func defaultTestConfig() *config.Config { config.AllIsolationGroups = []string{"datacenterA", "datacenterB"} config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10) config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) + config.LocalTaskWaitTime = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(time.Millisecond) return config } @@ -595,7 +596,7 @@ func TestTaskListManagerGetTaskBatch(t *testing.T) { RunID: "run1", WorkflowID: "workflow1", ScheduleID: scheduleID, - ScheduleToStartTimeout: 1, + ScheduleToStartTimeout: 100, }, } _, err = tlm.AddTask(context.Background(), addParams) @@ -739,7 +740,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) { cfg.MaxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes // set idle timer check to a really small value to assert that we don't accidentally drop tasks while blocking // on enqueuing a task to task buffer - cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) + cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(20 * time.Millisecond) tlMgr, err := NewManager( mockDomainCache, logger, @@ -767,7 +768,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) { RunID: "run1", WorkflowID: "workflow1", ScheduleID: scheduleID, - ScheduleToStartTimeout: 5, + ScheduleToStartTimeout: 100, }, } if i%2 == 0 { diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go new file mode 100644 index 00000000000..8c9346391e6 --- /dev/null +++ b/service/worker/diagnostics/activities.go @@ -0,0 +1,52 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "context" + + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/worker/diagnostics/invariants" +) + +type retrieveExecutionHistoryInputParams struct { + domain string + execution *types.WorkflowExecution +} + +func (w *dw) retrieveExecutionHistory(ctx context.Context, info retrieveExecutionHistoryInputParams) (*types.GetWorkflowExecutionHistoryResponse, error) { + frontendClient := w.clientBean.GetFrontendClient() + return frontendClient.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{ + Domain: info.domain, + Execution: info.execution, + }) +} + +type identifyTimeoutsInputParams struct { + history *types.GetWorkflowExecutionHistoryResponse +} + +func (w *dw) identifyTimeouts(ctx context.Context, info identifyTimeoutsInputParams) ([]invariants.InvariantCheckResult, error) { + timeoutInvariant := invariants.NewTimeout(info.history) + return timeoutInvariant.Check(ctx) +} diff --git a/service/worker/diagnostics/activities_test.go b/service/worker/diagnostics/activities_test.go new file mode 100644 index 00000000000..44ca2657c70 --- /dev/null +++ b/service/worker/diagnostics/activities_test.go @@ -0,0 +1,96 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "context" + "encoding/json" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/client/frontend" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/worker/diagnostics/invariants" +) + +func Test__retrieveExecutionHistory(t *testing.T) { + dwtest := testDiagnosticWorkflow(t) + result, err := dwtest.retrieveExecutionHistory(context.Background(), retrieveExecutionHistoryInputParams{ + domain: "test", + execution: &types.WorkflowExecution{ + WorkflowID: "123", + RunID: "abc", + }, + }) + require.NoError(t, err) + require.Equal(t, testWorkflowExecutionHistoryResponse(), result) +} + +func Test__identifyTimeouts(t *testing.T) { + dwtest := testDiagnosticWorkflow(t) + workflowTimeoutSecondInBytes, err := json.Marshal(int32(10)) + require.NoError(t, err) + expectedResult := []invariants.InvariantCheckResult{ + { + InvariantType: invariants.TimeoutTypeExecution.String(), + Reason: "START_TO_CLOSE", + Metadata: workflowTimeoutSecondInBytes, + }, + } + result, err := dwtest.identifyTimeouts(context.Background(), identifyTimeoutsInputParams{history: testWorkflowExecutionHistoryResponse()}) + require.NoError(t, err) + require.Equal(t, expectedResult, result) +} + +func testDiagnosticWorkflow(t *testing.T) *dw { + ctrl := gomock.NewController(t) + mockClientBean := client.NewMockBean(ctrl) + mockFrontendClient := frontend.NewMockClient(ctrl) + mockClientBean.EXPECT().GetFrontendClient().Return(mockFrontendClient).AnyTimes() + mockFrontendClient.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any()).Return(testWorkflowExecutionHistoryResponse(), nil).AnyTimes() + return &dw{ + clientBean: mockClientBean, + } +} + +func testWorkflowExecutionHistoryResponse() *types.GetWorkflowExecutionHistoryResponse { + return &types.GetWorkflowExecutionHistoryResponse{ + History: &types.History{ + Events: []*types.HistoryEvent{ + { + ID: 1, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(10), + }, + }, + { + WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()}, + }, + }, + }, + } +} diff --git a/service/worker/diagnostics/module.go b/service/worker/diagnostics/module.go new file mode 100644 index 00000000000..3efa0f88eaa --- /dev/null +++ b/service/worker/diagnostics/module.go @@ -0,0 +1,89 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "context" + + "github.com/opentracing/opentracing-go" + "github.com/uber-go/tally" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/metrics" +) + +type DiagnosticsWorkflow interface { + Start() error + Stop() +} + +type dw struct { + svcClient workflowserviceclient.Interface + clientBean client.Bean + metricsClient metrics.Client + tallyScope tally.Scope + worker worker.Worker +} + +type Params struct { + ServiceClient workflowserviceclient.Interface + ClientBean client.Bean + MetricsClient metrics.Client + TallyScope tally.Scope +} + +// New creates a new diagnostics workflow. +func New(params Params) DiagnosticsWorkflow { + return &dw{ + svcClient: params.ServiceClient, + metricsClient: params.MetricsClient, + tallyScope: params.TallyScope, + clientBean: params.ClientBean, + } +} + +// Start starts the worker +func (w *dw) Start() error { + workerOpts := worker.Options{ + MetricsScope: w.tallyScope, + BackgroundActivityContext: context.Background(), + Tracer: opentracing.GlobalTracer(), + MaxConcurrentActivityTaskPollers: 10, + MaxConcurrentDecisionTaskPollers: 10, + } + newWorker := worker.New(w.svcClient, common.SystemLocalDomainName, tasklist, workerOpts) + newWorker.RegisterWorkflowWithOptions(w.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow}) + newWorker.RegisterActivityWithOptions(w.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity}) + newWorker.RegisterActivityWithOptions(w.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity}) + w.worker = newWorker + return newWorker.Start() +} + +func (w *dw) Stop() { + w.worker.Stop() +} diff --git a/service/worker/diagnostics/module_test.go b/service/worker/diagnostics/module_test.go new file mode 100644 index 00000000000..d8a8934f8c8 --- /dev/null +++ b/service/worker/diagnostics/module_test.go @@ -0,0 +1,60 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/cadence/.gen/go/shared" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/resource" +) + +func Test__Start(t *testing.T) { + dwTest, mockResource := setuptest(t) + err := dwTest.Start() + require.NoError(t, err) + dwTest.Stop() + mockResource.Finish(t) +} + +func setuptest(t *testing.T) (DiagnosticsWorkflow, *resource.Test) { + ctrl := gomock.NewController(t) + mockClientBean := client.NewMockBean(ctrl) + mockResource := resource.NewTest(t, ctrl, metrics.Worker) + sdkClient := mockResource.GetSDKClient() + mockResource.SDKClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.DescribeDomainResponse{}, nil).AnyTimes() + mockResource.SDKClient.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.PollForDecisionTaskResponse{}, nil).AnyTimes() + mockResource.SDKClient.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&shared.PollForActivityTaskResponse{}, nil).AnyTimes() + return New(Params{ + ServiceClient: sdkClient, + ClientBean: mockClientBean, + MetricsClient: nil, + TallyScope: tally.TestScope(nil), + }), mockResource +} diff --git a/service/worker/diagnostics/workflow.go b/service/worker/diagnostics/workflow.go new file mode 100644 index 00000000000..4f0a5bb3c1d --- /dev/null +++ b/service/worker/diagnostics/workflow.go @@ -0,0 +1,76 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "fmt" + "time" + + "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/worker/diagnostics/invariants" +) + +const ( + diagnosticsWorkflow = "diagnostics-workflow" + tasklist = "wf-diagnostics" + + retrieveWfExecutionHistoryActivity = "retrieveWfExecutionHistory" + identifyTimeoutsActivity = "identifyTimeouts" +) + +type DiagnosticsWorkflowInput struct { + Domain string + WorkflowID string + RunID string +} + +func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) error { + activityOptions := workflow.ActivityOptions{ + ScheduleToCloseTimeout: time.Second * 10, + ScheduleToStartTimeout: time.Second * 5, + StartToCloseTimeout: time.Second * 5, + } + activityCtx := workflow.WithActivityOptions(ctx, activityOptions) + + var wfExecutionHistory *types.GetWorkflowExecutionHistoryResponse + err := workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{ + domain: params.Domain, + execution: &types.WorkflowExecution{ + WorkflowID: params.WorkflowID, + RunID: params.RunID, + }}).Get(ctx, &wfExecutionHistory) + if err != nil { + return fmt.Errorf("RetrieveExecutionHistory: %w", err) + } + + var checkResult []invariants.InvariantCheckResult + err = workflow.ExecuteActivity(activityCtx, w.identifyTimeouts, identifyTimeoutsInputParams{ + history: wfExecutionHistory}).Get(ctx, &checkResult) + if err != nil { + return fmt.Errorf("IdentifyTimeouts: %w", err) + } + + return nil +} diff --git a/service/worker/diagnostics/workflow_test.go b/service/worker/diagnostics/workflow_test.go new file mode 100644 index 00000000000..f5e258baf56 --- /dev/null +++ b/service/worker/diagnostics/workflow_test.go @@ -0,0 +1,101 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package diagnostics + +import ( + "errors" + "fmt" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/testsuite" + "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/resource" +) + +type diagnosticsWorkflowTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite + workflowEnv *testsuite.TestWorkflowEnvironment + dw *dw +} + +func TestDiagnosticsWorkflowTestSuite(t *testing.T) { + suite.Run(t, new(diagnosticsWorkflowTestSuite)) +} + +func (s *diagnosticsWorkflowTestSuite) SetupTest() { + s.workflowEnv = s.NewTestWorkflowEnvironment() + controller := gomock.NewController(s.T()) + mockResource := resource.NewTest(s.T(), controller, metrics.Worker) + + s.dw = &dw{ + svcClient: mockResource.GetSDKClient(), + clientBean: mockResource.ClientBean, + } + + s.T().Cleanup(func() { + mockResource.Finish(s.T()) + }) + + s.workflowEnv.RegisterWorkflowWithOptions(s.dw.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow}) + s.workflowEnv.RegisterActivityWithOptions(s.dw.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity}) + s.workflowEnv.RegisterActivityWithOptions(s.dw.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity}) +} + +func (s *diagnosticsWorkflowTestSuite) TearDownTest() { + s.workflowEnv.AssertExpectations(s.T()) +} + +func (s *diagnosticsWorkflowTestSuite) TestWorkflow() { + params := &DiagnosticsWorkflowInput{ + Domain: "test", + WorkflowID: "123", + RunID: "abc", + } + s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil) + s.workflowEnv.OnActivity(identifyTimeoutsActivity, mock.Anything, mock.Anything).Return(nil, nil) + s.workflowEnv.ExecuteWorkflow(diagnosticsWorkflow, params) + s.True(s.workflowEnv.IsWorkflowCompleted()) +} + +func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() { + params := &DiagnosticsWorkflowInput{ + Domain: "test", + WorkflowID: "123", + RunID: "abc", + } + mockErr := errors.New("mockErr") + errExpected := fmt.Errorf("IdentifyTimeouts: %w", mockErr) + s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil) + s.workflowEnv.OnActivity(identifyTimeoutsActivity, mock.Anything, mock.Anything).Return(nil, mockErr) + s.workflowEnv.ExecuteWorkflow(diagnosticsWorkflow, params) + s.True(s.workflowEnv.IsWorkflowCompleted()) + s.Error(s.workflowEnv.GetWorkflowError()) + s.EqualError(s.workflowEnv.GetWorkflowError(), errExpected.Error()) +} diff --git a/service/worker/service.go b/service/worker/service.go index 5464a115693..f9fb113329c 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -39,6 +39,7 @@ import ( "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/asyncworkflow" "github.com/uber/cadence/service/worker/batcher" + "github.com/uber/cadence/service/worker/diagnostics" "github.com/uber/cadence/service/worker/esanalyzer" "github.com/uber/cadence/service/worker/failovermanager" "github.com/uber/cadence/service/worker/indexer" @@ -220,6 +221,7 @@ func (s *Service) Start() { } s.startReplicator() + s.startDiagnostics() if s.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() { s.startArchiver() @@ -332,6 +334,19 @@ func (s *Service) startFixerWorkflowWorker() { } } +func (s *Service) startDiagnostics() { + params := diagnostics.Params{ + ServiceClient: s.params.PublicClient, + MetricsClient: s.GetMetricsClient(), + TallyScope: s.params.MetricScope, + ClientBean: s.GetClientBean(), + } + if err := diagnostics.New(params).Start(); err != nil { + s.Stop() + s.GetLogger().Fatal("error starting diagnostics", tag.Error(err)) + } +} + func (s *Service) startReplicator() { domainReplicationTaskExecutor := domain.NewReplicationTaskExecutor( s.Resource.GetDomainManager(),