diff --git a/service/history/task/cross_cluster_source_task_executor.go b/service/history/task/cross_cluster_source_task_executor.go index b1c2dba7505..059a9725411 100644 --- a/service/history/task/cross_cluster_source_task_executor.go +++ b/service/history/task/cross_cluster_source_task_executor.go @@ -257,6 +257,10 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask( if failedCause != nil { // remaining errors are non-retryable + cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution + if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted { + cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted + } return requestCancelExternalExecutionFailed( ctx, taskInfo, @@ -265,6 +269,7 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask( taskInfo.TargetWorkflowID, taskInfo.TargetRunID, now, + cause, ) } return requestCancelExternalExecutionCompleted( @@ -479,6 +484,10 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask( if failedCause != nil { // remaining errors are non-retryable + cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution + if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted { + cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted + } return signalExternalExecutionFailed( ctx, taskInfo, @@ -488,6 +497,7 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask( taskInfo.TargetRunID, signalInfo.Control, now, + cause, ) } diff --git a/service/history/task/cross_cluster_source_task_executor_test.go b/service/history/task/cross_cluster_source_task_executor_test.go index 916e962d9ee..78b7afa01b9 100644 --- a/service/history/task/cross_cluster_source_task_executor_test.go +++ b/service/history/task/cross_cluster_source_task_executor_test.go @@ -664,7 +664,7 @@ func (s *crossClusterSourceTaskExecutorSuite) TestExecuteCancelExecution_Failure &types.CrossClusterTaskResponse{ TaskType: types.CrossClusterTaskTypeCancelExecution.Ptr(), TaskState: int16(processingStateInitialized), - FailedCause: types.CrossClusterTaskFailedCauseWorkflowNotExists.Ptr(), + FailedCause: types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted.Ptr(), }, func( mutableState execution.MutableState, diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 802bbd646c6..f5babe3a49f 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -630,6 +630,7 @@ func (t *transferActiveTaskExecutor) processCancelExecution( task.TargetWorkflowID, task.TargetRunID, t.shard.GetTimeSource().Now(), + types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, ) return err } @@ -656,6 +657,11 @@ func (t *transferActiveTaskExecutor) processCancelExecution( // for retryable error just return return err } + cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution + var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError + if errors.As(err, &alreadyCompletedErr) { + cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted + } return requestCancelExternalExecutionFailed( ctx, task, @@ -664,6 +670,7 @@ func (t *transferActiveTaskExecutor) processCancelExecution( task.TargetWorkflowID, task.TargetRunID, t.shard.GetTimeSource().Now(), + cause, ) } @@ -750,6 +757,7 @@ func (t *transferActiveTaskExecutor) processSignalExecution( task.TargetRunID, signalInfo.Control, t.shard.GetTimeSource().Now(), + types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, ) } @@ -769,12 +777,17 @@ func (t *transferActiveTaskExecutor) processSignalExecution( tag.TargetWorkflowRunID(task.TargetRunID), tag.Error(err)) - // Check to see if the error is non-transient, in which case add SignalFailed + // Check to see if the error is non-transient, in which case add RequestCancelFailed // event and complete transfer task by setting the err = nil if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) { // for retryable error just return return err } + var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError + cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution + if errors.As(err, &alreadyCompletedErr) { + cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted + } return signalExternalExecutionFailed( ctx, task, @@ -784,6 +797,7 @@ func (t *transferActiveTaskExecutor) processSignalExecution( task.TargetRunID, signalInfo.Control, t.shard.GetTimeSource().Now(), + cause, ) } @@ -1419,6 +1433,7 @@ func requestCancelExternalExecutionFailed( targetWorkflowID string, targetRunID string, now time.Time, + cause types.CancelExternalWorkflowExecutionFailedCause, ) error { err := updateWorkflowExecution(ctx, wfContext, true, @@ -1439,7 +1454,7 @@ func requestCancelExternalExecutionFailed( targetDomain, targetWorkflowID, targetRunID, - types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, + cause, ) return err }, @@ -1464,6 +1479,7 @@ func signalExternalExecutionFailed( targetRunID string, control []byte, now time.Time, + cause types.SignalExternalWorkflowExecutionFailedCause, ) error { err := updateWorkflowExecution(ctx, wfContext, true, @@ -1485,7 +1501,7 @@ func signalExternalExecutionFailed( targetWorkflowID, targetRunID, control, - types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, + cause, ) return err }, diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 3d8e54a3fb6..c4f81196eb1 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -1046,7 +1046,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Success() { ) } -func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Failure() { +func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_EntityNotExistsError() { s.testProcessCancelExecution( s.targetDomainID, func( @@ -1067,6 +1067,27 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Failure() { ) } +func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_WorkflowAlreadyCompleted() { + s.testProcessCancelExecution( + s.targetDomainID, + func( + mutableState execution.MutableState, + workflowExecution, targetExecution types.WorkflowExecution, + event *types.HistoryEvent, + transferTask Task, + requestCancelInfo *persistence.RequestCancelInfo, + ) { + persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version) + s.NoError(err) + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + cancelRequest := createTestRequestCancelWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), requestCancelInfo.CancelRequestID) + s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), cancelRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1) + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() + }, + ) +} + func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Duplication() { s.testProcessCancelExecution( s.targetDomainID, @@ -1202,7 +1223,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Success() { ) } -func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() { +func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_EntityNotExistsError() { s.testProcessSignalExecution( s.targetDomainID, func( @@ -1223,6 +1244,26 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() { ) } +func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_WorkflowAlreadyCompletedError() { + s.testProcessSignalExecution( + s.targetDomainID, + func( + mutableState execution.MutableState, + workflowExecution, targetExecution types.WorkflowExecution, + event *types.HistoryEvent, + transferTask Task, + signalInfo *persistence.SignalInfo, + ) { + persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version) + s.NoError(err) + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + signalRequest := createTestSignalWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), signalInfo) + s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), signalRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1) + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() + }, + ) +} func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Duplication() { s.testProcessSignalExecution( s.targetDomainID,