Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for replication_task #6335

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 292 additions & 0 deletions service/history/ndc/replication_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -264,3 +265,294 @@ func TestNewReplicationTask(t *testing.T) {
})
}
}

func Test_getWorkflowResetMetadata(t *testing.T) {
tests := map[string]struct {
mockTaskAffordance func(mockTask *replicationTaskImpl)
expectBaseRunID string
expectNewRunID string
expectBaseEventVersion int64
expectIsReset bool
}{
"Case1: DecisionTaskFailed with reset workflow": {
mockTaskAffordance: func(mockTask *replicationTaskImpl) {
decisionTaskFailedEvent := &types.HistoryEvent{
EventType: types.EventTypeDecisionTaskFailed.Ptr(),
DecisionTaskFailedEventAttributes: &types.DecisionTaskFailedEventAttributes{
BaseRunID: "base-run-id",
ForkEventVersion: 1,
NewRunID: "new-run-id",
Cause: types.DecisionTaskFailedCauseResetWorkflow.Ptr(),
},
}
mockTask.firstEvent = decisionTaskFailedEvent
},
expectBaseRunID: "base-run-id",
expectNewRunID: "new-run-id",
expectBaseEventVersion: 1,
expectIsReset: true,
},
"Case2: DecisionTaskTimedOut with reset workflow": {
mockTaskAffordance: func(mockTask *replicationTaskImpl) {
decisionTaskTimedOutEvent := &types.HistoryEvent{
EventType: types.EventTypeDecisionTaskTimedOut.Ptr(),
DecisionTaskTimedOutEventAttributes: &types.DecisionTaskTimedOutEventAttributes{
BaseRunID: "base-run-id-timedout",
ForkEventVersion: 2,
NewRunID: "new-run-id-timedout",
Cause: types.DecisionTaskTimedOutCauseReset.Ptr(),
},
}
mockTask.firstEvent = decisionTaskTimedOutEvent
},
expectBaseRunID: "base-run-id-timedout",
expectNewRunID: "new-run-id-timedout",
expectBaseEventVersion: 2,
expectIsReset: true,
},
"Case3: DecisionTaskFailed without reset workflow": {
mockTaskAffordance: func(mockTask *replicationTaskImpl) {
decisionTaskFailedEvent := &types.HistoryEvent{
EventType: types.EventTypeDecisionTaskFailed.Ptr(),
DecisionTaskFailedEventAttributes: &types.DecisionTaskFailedEventAttributes{
BaseRunID: "base-run-id",
ForkEventVersion: 1,
NewRunID: "new-run-id",
Cause: types.DecisionTaskFailedCause.Ptr(0),
},
}
mockTask.firstEvent = decisionTaskFailedEvent
},
expectBaseRunID: "base-run-id",
expectNewRunID: "new-run-id",
expectBaseEventVersion: 1,
expectIsReset: false,
},
"Case4: DecisionTaskTimedOut without reset workflow": {
mockTaskAffordance: func(mockTask *replicationTaskImpl) {
decisionTaskTimedOutEvent := &types.HistoryEvent{
EventType: types.EventTypeDecisionTaskTimedOut.Ptr(),
DecisionTaskTimedOutEventAttributes: &types.DecisionTaskTimedOutEventAttributes{
BaseRunID: "base-run-id-timedout",
ForkEventVersion: 2,
NewRunID: "new-run-id-timedout",
Cause: types.DecisionTaskTimedOutCause.Ptr(0),
},
}
mockTask.firstEvent = decisionTaskTimedOutEvent
},
expectBaseRunID: "base-run-id-timedout",
expectNewRunID: "new-run-id-timedout",
expectBaseEventVersion: 2,
expectIsReset: false,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
// Create a mock task
mockTask := &replicationTaskImpl{}

// Apply test case mock behavior
test.mockTaskAffordance(mockTask)

// Call the method under test
baseRunID, newRunID, baseEventVersion, isReset := mockTask.getWorkflowResetMetadata()

// Assertions
assert.Equal(t, test.expectBaseRunID, baseRunID)
assert.Equal(t, test.expectNewRunID, newRunID)
assert.Equal(t, test.expectBaseEventVersion, baseEventVersion)
assert.Equal(t, test.expectIsReset, isReset)
})
}
}

func Test_splitTask(t *testing.T) {
tests := map[string]struct {
mockTaskAffordance func(mockTask *replicationTaskImpl)
taskStartTime time.Time
expectedNewRunTask bool
expectedError error
}{
"Case1: success case with valid newEvents and continuedAsNew event": {
mockTaskAffordance: func(mockTask *replicationTaskImpl) {
timestamp1 := time.Now().UnixNano()
timestamp2 := time.Now().Add(1 * time.Second).UnixNano()

mockTask.newEvents = []*types.HistoryEvent{
{ID: 1, Version: 1, Timestamp: &timestamp1},
{ID: 2, Version: 1, Timestamp: &timestamp2},
}
mockTask.firstEvent = &types.HistoryEvent{
EventType: types.EventTypeWorkflowExecutionContinuedAsNew.Ptr(),
WorkflowExecutionContinuedAsNewEventAttributes: &types.WorkflowExecutionContinuedAsNewEventAttributes{
NewExecutionRunID: "new-run-id",
},
}
mockTask.lastEvent = &types.HistoryEvent{
EventType: types.EventTypeWorkflowExecutionContinuedAsNew.Ptr(),
ID: 2,
Version: 1,
WorkflowExecutionContinuedAsNewEventAttributes: &types.WorkflowExecutionContinuedAsNewEventAttributes{
NewExecutionRunID: "new-run-id",
},
}
// Initialize execution to avoid nil pointer error
mockTask.execution = &types.WorkflowExecution{
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
}
// Assign a logger
mockTask.logger = log.NewNoop()
},
taskStartTime: time.Now(),
expectedNewRunTask: true,
expectedError: nil,
},
"Case2: error when no newEvents": {
mockTaskAffordance: func(mockTask *replicationTaskImpl) {
mockTask.newEvents = nil
// Initialize execution to avoid nil pointer error
mockTask.execution = &types.WorkflowExecution{
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
}
mockTask.logger = log.NewNoop() // Assign a logger
},
taskStartTime: time.Now(),
expectedNewRunTask: false,
expectedError: ErrNoNewRunHistory,
},
"Case3: error when lastEvent is not continuedAsNew": {
mockTaskAffordance: func(mockTask *replicationTaskImpl) {
timestamp1 := time.Now().UnixNano()

mockTask.newEvents = []*types.HistoryEvent{
{ID: 1, Version: 1, Timestamp: &timestamp1},
}
mockTask.lastEvent = &types.HistoryEvent{
EventType: types.EventTypeWorkflowExecutionCompleted.Ptr(), // Not continuedAsNew
ID: 1,
Version: 1,
}
// Initialize execution to avoid nil pointer error
mockTask.execution = &types.WorkflowExecution{
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
}
mockTask.logger = log.NewNoop() // Assign a logger
},
taskStartTime: time.Now(),
expectedNewRunTask: false,
expectedError: ErrLastEventIsNotContinueAsNew,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
// Create a mock task
mockTask := &replicationTaskImpl{}

// Apply the mock behavior based on the test case
test.mockTaskAffordance(mockTask)

// Call the method under test
_, newRunTask, err := mockTask.splitTask(test.taskStartTime)

// Assertions
if test.expectedError != nil {
assert.Equal(t, test.expectedError, err)
assert.Nil(t, newRunTask)
} else {
assert.NoError(t, err)
assert.NotNil(t, newRunTask)
assert.Equal(t, "new-run-id", newRunTask.getRunID())
}
})
}
}

func Test_replicationTaskImpl_Getters(t *testing.T) {
// Setup some common values for testing
workflowExecution := &types.WorkflowExecution{
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
}
historyEvents := []*types.HistoryEvent{
{ID: 1, Version: 1},
{ID: 2, Version: 1},
}
newHistoryEvents := []*types.HistoryEvent{
{ID: 3, Version: 1},
{ID: 4, Version: 1},
}
logger := log.NewNoop()
versionHistory := persistence.NewVersionHistory(nil, []*persistence.VersionHistoryItem{
{EventID: 1, Version: 1},
})

task := &replicationTaskImpl{
domainID: "test-domain-id",
execution: workflowExecution,
version: 123,
sourceCluster: "test-cluster",
eventTime: time.Now(),
events: historyEvents,
newEvents: newHistoryEvents,
logger: logger,
versionHistory: versionHistory,
}

tests := map[string]struct {
testFunc func() interface{}
expectedResult interface{}
}{
"getDomainID": {
testFunc: func() interface{} { return task.getDomainID() },
expectedResult: "test-domain-id",
},
"getWorkflowID": {
testFunc: func() interface{} { return task.getWorkflowID() },
expectedResult: "test-workflow-id",
},
"getRunID": {
testFunc: func() interface{} { return task.getRunID() },
expectedResult: "test-run-id",
},
"getEventTime": {
testFunc: func() interface{} { return task.getEventTime() },
expectedResult: task.eventTime,
},
"getVersion": {
testFunc: func() interface{} { return task.getVersion() },
expectedResult: int64(123),
},
"getSourceCluster": {
testFunc: func() interface{} { return task.getSourceCluster() },
expectedResult: "test-cluster",
},
"getEvents": {
testFunc: func() interface{} { return task.getEvents() },
expectedResult: historyEvents,
},
"getNewEvents": {
testFunc: func() interface{} { return task.getNewEvents() },
expectedResult: newHistoryEvents,
},
"getLogger": {
testFunc: func() interface{} { return task.getLogger() },
expectedResult: logger,
},
"getVersionHistory": {
testFunc: func() interface{} { return task.getVersionHistory() },
expectedResult: versionHistory,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, test.expectedResult, test.testFunc())
})
}
}
Loading