diff --git a/service/matching/config/config.go b/service/matching/config/config.go index 6c055d95192..282b3fa6bf2 100644 --- a/service/matching/config/config.go +++ b/service/matching/config/config.go @@ -77,6 +77,8 @@ type ( // rate limiter configuration TaskDispatchRPS float64 TaskDispatchRPSTTL time.Duration + // task gc configuration + MaxTimeBetweenTaskDeletes time.Duration } ForwarderConfig struct { @@ -113,6 +115,8 @@ type ( // rate limiter configuration TaskDispatchRPS float64 TaskDispatchRPSTTL time.Duration + // task gc configuration + MaxTimeBetweenTaskDeletes time.Duration } ) @@ -153,6 +157,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { HostName: hostName, TaskDispatchRPS: 100000.0, TaskDispatchRPSTTL: time.Minute, + MaxTimeBetweenTaskDeletes: time.Second, } } diff --git a/service/matching/handler/engine_integration_test.go b/service/matching/handler/engine_integration_test.go index f34e6a15d4a..e591fd9833d 100644 --- a/service/matching/handler/engine_integration_test.go +++ b/service/matching/handler/engine_integration_test.go @@ -94,13 +94,11 @@ func TestMatchingEngineSuite(t *testing.T) { } func (s *matchingEngineSuite) SetupSuite() { - s.logger = testlogger.New(s.Suite.T()) http.Handle("/test/tasks", http.HandlerFunc(s.TasksHandler)) } // Renders content of taskManager and matchingEngine when called at http://localhost:6060/test/tasks // Uncomment HTTP server initialization in SetupSuite method to enable. - func (s *matchingEngineSuite) TasksHandler(w http.ResponseWriter, r *http.Request) { s.Lock() defer s.Unlock() @@ -115,6 +113,7 @@ func (s *matchingEngineSuite) TearDownSuite() { func (s *matchingEngineSuite) SetupTest() { s.Lock() defer s.Unlock() + s.logger = testlogger.New(s.Suite.T()).WithTags(tag.Dynamic("test-name", s.T().Name())) tlKindNormal := types.TaskListKindNormal s.mockExecutionManager = &mocks.ExecutionManager{} s.controller = gomock.NewController(s.T()) @@ -555,24 +554,7 @@ func (s *matchingEngineSuite) SyncMatchTasks(taskType int, enableIsolation bool) s.matchingEngine.metricsClient = metrics.NewClient(scope, metrics.Matching) testParam := newTestParam(s.T(), taskType) - tlKind := types.TaskListKindNormal - mgr, err := tasklist.NewManager( - s.matchingEngine.domainCache, - s.matchingEngine.logger, - s.matchingEngine.metricsClient, - s.matchingEngine.taskManager, - s.matchingEngine.clusterMetadata, - s.matchingEngine.partitioner, - s.matchingEngine.matchingClient, - s.matchingEngine.removeTaskListManager, - testParam.TaskListID, - &tlKind, - s.matchingEngine.config, - s.matchingEngine.timeSource, - s.matchingEngine.timeSource.Now()) - s.NoError(err) s.taskManager.SetRangeID(testParam.TaskListID, initialRangeID) - s.NoError(mgr.Start()) s.setupGetDrainStatus() s.setupRecordTaskStartedMock(taskType, testParam, false) @@ -740,24 +722,7 @@ func (s *matchingEngineSuite) ConcurrentAddAndPollTasks(taskType int, workerCoun s.matchingEngine.config.RangeSize = rangeSize // override to low number for the test s.matchingEngine.config.TaskDispatchRPSTTL = time.Nanosecond s.matchingEngine.config.MinTaskThrottlingBurstSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(_minBurst) - - mgr, err := tasklist.NewManager( - s.matchingEngine.domainCache, - s.matchingEngine.logger, - s.matchingEngine.metricsClient, - s.matchingEngine.taskManager, - s.matchingEngine.clusterMetadata, - s.matchingEngine.partitioner, - s.matchingEngine.matchingClient, - s.matchingEngine.removeTaskListManager, - testParam.TaskListID, - &tlKind, - s.matchingEngine.config, - s.matchingEngine.timeSource, - s.matchingEngine.timeSource.Now()) - s.NoError(err) s.taskManager.SetRangeID(testParam.TaskListID, initialRangeID) - s.NoError(mgr.Start()) s.setupGetDrainStatus() @@ -822,6 +787,10 @@ func (s *matchingEngineSuite) ConcurrentAddAndPollTasks(taskType int, workerCoun expectedRange := getExpectedRange(initialRangeID, persisted, rangeSize) // Due to conflicts some ids are skipped and more real ranges are used. s.True(expectedRange <= s.taskManager.GetRangeID(testParam.TaskListID)) + mgr, err := s.matchingEngine.getTaskListManager(testParam.TaskListID, &tlKind) + s.NoError(err) + // stop the tasklist manager to force the acked tasks to be deleted + mgr.Stop() s.EqualValues(0, s.taskManager.GetTaskCount(testParam.TaskListID)) syncCtr := scope.Snapshot().Counters()["test.sync_throttle_count_per_tl+domain="+matchingTestDomainName+",operation=TaskListMgr,tasklist="+testParam.TaskList.Name] @@ -1237,7 +1206,7 @@ func (s *matchingEngineSuite) setupRecordTaskStartedMock(taskType int, param *te if taskType == persistence.TaskListTypeActivity { s.mockHistoryClient.EXPECT().RecordActivityTaskStarted(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, taskRequest *types.RecordActivityTaskStartedRequest, option ...yarpc.CallOption) (*types.RecordActivityTaskStartedResponse, error) { - s.logger.Debug("Mock Received RecordActivityTaskStartedRequest") + s.logger.Debug(fmt.Sprintf("Mock Received RecordActivityTaskStartedRequest, taskID: %v", taskRequest.TaskID)) if checkDuplicate { if _, ok := startedTasks[taskRequest.TaskID]; ok { return nil, &types.EventAlreadyStartedError{Message: "already started"} @@ -1262,7 +1231,7 @@ func (s *matchingEngineSuite) setupRecordTaskStartedMock(taskType int, param *te } else { s.mockHistoryClient.EXPECT().RecordDecisionTaskStarted(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, taskRequest *types.RecordDecisionTaskStartedRequest, option ...yarpc.CallOption) (*types.RecordDecisionTaskStartedResponse, error) { - s.logger.Debug("Mock Received RecordDecisionTaskStartedRequest") + s.logger.Debug(fmt.Sprintf("Mock Received RecordDecisionTaskStartedRequest, taskID: %v", taskRequest.TaskID)) if checkDuplicate { if _, ok := startedTasks[taskRequest.TaskID]; ok { return nil, &types.EventAlreadyStartedError{Message: "already started"} @@ -1376,6 +1345,7 @@ func defaultTestConfig() *config.Config { config.AllIsolationGroups = []string{"datacenterA", "datacenterB"} config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10) config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) + config.MaxTimeBetweenTaskDeletes = time.Duration(0) return config } diff --git a/service/matching/tasklist/task_gc.go b/service/matching/tasklist/task_gc.go index 7e26733b63c..5b7ab0272cd 100644 --- a/service/matching/tasklist/task_gc.go +++ b/service/matching/tasklist/task_gc.go @@ -35,8 +35,6 @@ type taskGC struct { config *config.TaskListConfig } -var maxTimeBetweenTaskDeletes = time.Second - // newTaskGC returns an instance of a task garbage collector object // taskGC internally maintains a delete cursor and attempts to delete // a batch of tasks everytime Run() method is called. @@ -91,7 +89,7 @@ func (tgc *taskGC) checkPrecond(ackLevel int64, batchSize int, ignoreTimeCond bo if backlog >= int64(batchSize) { return true } - return backlog > 0 && (ignoreTimeCond || time.Since(tgc.lastDeleteTime) > maxTimeBetweenTaskDeletes) + return backlog > 0 && (ignoreTimeCond || time.Since(tgc.lastDeleteTime) > tgc.config.MaxTimeBetweenTaskDeletes) } func (tgc *taskGC) tryLock() bool { diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index 0eab2df36b7..5acda7e78cc 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -759,9 +759,10 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainCache cache.Dom return common.MaxInt(1, cfg.ForwarderMaxChildrenPerNode(domainName, taskListName, taskType)) }, }, - HostName: cfg.HostName, - TaskDispatchRPS: cfg.TaskDispatchRPS, - TaskDispatchRPSTTL: cfg.TaskDispatchRPSTTL, + HostName: cfg.HostName, + TaskDispatchRPS: cfg.TaskDispatchRPS, + TaskDispatchRPSTTL: cfg.TaskDispatchRPSTTL, + MaxTimeBetweenTaskDeletes: cfg.MaxTimeBetweenTaskDeletes, }, nil } diff --git a/service/matching/tasklist/task_list_manager_test.go b/service/matching/tasklist/task_list_manager_test.go index 9ca71facb0b..0795fd36a6f 100644 --- a/service/matching/tasklist/task_list_manager_test.go +++ b/service/matching/tasklist/task_list_manager_test.go @@ -714,7 +714,6 @@ func TestTaskExpiryAndCompletion(t *testing.T) { for _, tc := range testCases { t.Run("", func(t *testing.T) { - maxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes controller := gomock.NewController(t) mockPartitioner := partition.NewMockPartitioner(controller) mockPartitioner.EXPECT().GetIsolationGroupByDomainID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", nil).AnyTimes() @@ -728,6 +727,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) { cfg := defaultTestConfig() cfg.RangeSize = rangeSize cfg.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(tc.batchSize) + cfg.MaxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes // set idle timer check to a really small value to assert that we don't accidentally drop tasks while blocking // on enqueuing a task to task buffer cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) diff --git a/service/matching/tasklist/testing.go b/service/matching/tasklist/testing.go index 2e8dc1578d4..742e889876f 100644 --- a/service/matching/tasklist/testing.go +++ b/service/matching/tasklist/testing.go @@ -83,7 +83,7 @@ func (m *TestTaskManager) LeaseTaskList( tlm.Lock() defer tlm.Unlock() tlm.rangeID++ - m.logger.Debug(fmt.Sprintf("LeaseTaskList rangeID=%v", tlm.rangeID)) + m.logger.Debug(fmt.Sprintf("testTaskManager.LeaseTaskList rangeID=%v", tlm.rangeID)) return &persistence.LeaseTaskListResponse{ TaskListInfo: &persistence.TaskListInfo{ @@ -102,7 +102,7 @@ func (m *TestTaskManager) UpdateTaskList( _ context.Context, request *persistence.UpdateTaskListRequest, ) (*persistence.UpdateTaskListResponse, error) { - m.logger.Debug(fmt.Sprintf("UpdateTaskList taskListInfo=%v, ackLevel=%v", request.TaskListInfo, request.TaskListInfo.AckLevel)) + m.logger.Debug(fmt.Sprintf("testTaskManager.UpdateTaskList taskListInfo=%v, ackLevel=%v", request.TaskListInfo, request.TaskListInfo.AckLevel)) tli := request.TaskListInfo tlm := m.getTaskListManager(NewTestTaskListID(m.t, tli.DomainID, tli.Name, tli.TaskType)) @@ -111,7 +111,7 @@ func (m *TestTaskManager) UpdateTaskList( defer tlm.Unlock() if tlm.rangeID != tli.RangeID { return nil, &persistence.ConditionFailedError{ - Msg: fmt.Sprintf("Failed to update task list: name=%v, type=%v", tli.Name, tli.TaskType), + Msg: fmt.Sprintf("Failed to update task list: name=%v, type=%v, expected rangeID=%v, input rangeID=%v", tli.Name, tli.TaskType, tlm.rangeID, tli.RangeID), } } tlm.ackLevel = tli.AckLevel @@ -123,7 +123,7 @@ func (m *TestTaskManager) CompleteTask( _ context.Context, request *persistence.CompleteTaskRequest, ) error { - m.logger.Debug(fmt.Sprintf("CompleteTask taskID=%v, ackLevel=%v", request.TaskID, request.TaskList.AckLevel)) + m.logger.Debug(fmt.Sprintf("testTaskManager.CompleteTask taskID=%v, ackLevel=%v", request.TaskID, request.TaskList.AckLevel)) if request.TaskID <= 0 { panic(fmt.Errorf("Invalid taskID=%v", request.TaskID)) } @@ -143,7 +143,7 @@ func (m *TestTaskManager) CompleteTasksLessThan( _ context.Context, request *persistence.CompleteTasksLessThanRequest, ) (*persistence.CompleteTasksLessThanResponse, error) { - m.logger.Debug(fmt.Sprintf("CompleteTasksLessThan taskID=%v", request.TaskID)) + m.logger.Debug(fmt.Sprintf("testTaskManager.CompleteTasksLessThan taskID=%v", request.TaskID)) tlm := m.getTaskListManager(NewTestTaskListID(m.t, request.DomainID, request.TaskListName, request.TaskType)) tlm.Lock() defer tlm.Unlock()