Skip to content

Commit

Permalink
Adding a sample call to TaskValidator in update workflow cycle (#5634)
Browse files Browse the repository at this point in the history
* Adding a sample call to TaskValidator in update workflow cycle

* Made the calls non-functional

* fmt update
  • Loading branch information
agautam478 authored Feb 1, 2024
1 parent efa13df commit ebbc603
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 40 deletions.
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1942,6 +1942,13 @@ const (

EnableTimerDebugLogByDomainID

// EnableTaskVal is which allows the taskvalidation to be enabled.
// KeyName: system.enableTaskVal
// Value type: Bool
// Default value: false
// Allowed filters: DomainID
EnableTaskVal

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4165,6 +4172,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Enable log for debugging timer task issue by domain",
DefaultValue: false,
},
EnableTaskVal: DynamicBool{
KeyName: "system.enableTaskVal",
Description: "Enable TaskValidation",
DefaultValue: false,
},
WorkflowIDCacheEnabled: DynamicBool{
KeyName: "history.workflowIDCacheEnabled",
Filters: []Filter{DomainName},
Expand Down
15 changes: 13 additions & 2 deletions common/taskvalidator/validateworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ package taskvalidator
import (
"context"
"errors"
"time"

"go.uber.org/zap"

"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -59,14 +61,23 @@ type checkerImpl struct {
// NewWfChecker creates a new instance of a workflow validation checker.
// It requires a logger, metrics client, domain cache, persistence retryer,
// and a stale checker implementation to function.
func NewWfChecker(logger *zap.Logger, metrics metrics.Client, domainCache cache.DomainCache, pr persistence.Retryer, staleCheck staleChecker) Checker {
func NewWfChecker(logger *zap.Logger, metrics metrics.Client, domainCache cache.DomainCache, executionManager persistence.ExecutionManager, historymanager persistence.HistoryManager) (Checker, error) {
// Create the persistence retryer
retryPolicy := backoff.NewExponentialRetryPolicy(100 * time.Millisecond) // Adjust as needed
pr := persistence.NewPersistenceRetryer(executionManager, historymanager, retryPolicy)

// Create the stale check instance
staleCheckInstance := invariant.NewStaleWorkflow(pr, domainCache, logger)
staleCheck, _ := staleCheckInstance.(staleChecker)

// Return the checker implementation
return &checkerImpl{
logger: logger,
metricsClient: metrics,
dc: domainCache,
pr: pr,
staleCheck: staleCheck,
}
}, nil
}

// WorkflowCheckforValidation performs workflow validation.
Expand Down
62 changes: 24 additions & 38 deletions common/taskvalidator/validateworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,50 +68,36 @@ func TestWorkflowCheckforValidation(t *testing.T) {
mockLogger := zap.NewNop()
mockMetricsClient := metrics.NewNoopMetricsClient()
mockDomainCache := cache.NewMockDomainCache(mockCtrl)
mockPersistenceRetryer := persistence.NewMockRetryer(mockCtrl)
mockStaleChecker := &mockStaleChecker{
CheckAgeFunc: func(response *persistence.GetWorkflowExecutionResponse) (bool, error) {
return tc.isStale, nil
},
}
checker := NewWfChecker(mockLogger, mockMetricsClient, mockDomainCache, mockPersistenceRetryer, mockStaleChecker)
mockDomainCache.EXPECT().
GetDomainByID(tc.domainID).
Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
// In each test case
mockDomainCache.EXPECT().
GetDomainName(gomock.Any()). // You can use gomock.Any() if the exact argument is not important
Return("test-domain-name", nil).AnyTimes()

// For test cases where deletion is expected
mockExecutionManager := persistence.NewMockExecutionManager(mockCtrl)
mockHistoryManager := persistence.NewMockHistoryManager(mockCtrl)

checker, err := NewWfChecker(mockLogger, mockMetricsClient, mockDomainCache, mockExecutionManager, mockHistoryManager)
assert.NoError(t, err, "Failed to create checker")

mockDomainCache.EXPECT().GetDomainByID(tc.domainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
mockDomainCache.EXPECT().GetDomainName(tc.domainID).Return(tc.domainName, nil).AnyTimes()

if tc.isStale {
mockPersistenceRetryer.EXPECT().
DeleteWorkflowExecution(gomock.Any(), gomock.Any()).
Return(nil).Times(1)
mockPersistenceRetryer.EXPECT().
DeleteCurrentWorkflowExecution(gomock.Any(), gomock.Any()).
Return(nil).Times(1)
mockExecutionManager.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mockExecutionManager.EXPECT().DeleteCurrentWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
}

mockPersistenceRetryer.EXPECT().
GetWorkflowExecution(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) {
if tc.simulateError {
return nil, errors.New("database error")
}
// Return a valid response object to trigger the deletion calls
return &persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DomainID: constants.TestDomainID,
WorkflowID: constants.TestWorkflowID,
},
mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) {
if tc.simulateError {
return nil, errors.New("database error")
}
return &persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DomainID: constants.TestDomainID,
WorkflowID: constants.TestWorkflowID,
},
}, nil
}).AnyTimes()
},
}, nil
}).AnyTimes()

ctx := context.Background()
err := checker.WorkflowCheckforValidation(ctx, tc.workflowID, tc.domainID, tc.domainName, tc.runID)
err = checker.WorkflowCheckforValidation(ctx, tc.workflowID, tc.domainID, tc.domainName, tc.runID)

if tc.simulateError {
assert.Error(t, err, "Expected error when GetWorkflowExecution fails")
Expand Down
18 changes: 18 additions & 0 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,13 @@ func (c *contextImpl) updateWorkflowExecutionWithRetry(
resp, err = c.shard.UpdateWorkflowExecution(ctx, request)
return err
}
//Preparation for the task Validation.
//metricsClient := c.shard.GetMetricsClient()
//domainCache := c.shard.GetDomainCache()
//executionManager := c.shard.GetExecutionManager()
//historymanager := c.shard.GetHistoryManager()
//zapLogger, _ := zap.NewProduction()
//checker, _ := taskvalidator.NewWfChecker(zapLogger, metricsClient, domainCache, executionManager, historymanager)

isRetryable := func(err error) bool {
if _, ok := err.(*persistence.TimeoutError); ok {
Expand Down Expand Up @@ -1247,6 +1254,17 @@ func (c *contextImpl) updateWorkflowExecutionWithRetry(
tag.Error(err),
tag.Number(c.updateCondition),
)
//TODO: Call the Task Validation here so that it happens whenever an error happen during Update.
//err1 := checker.WorkflowCheckforValidation(
// ctx,
// c.workflowExecution.GetWorkflowID(),
// c.domainID,
// c.GetDomainName(),
// c.workflowExecution.GetRunID(),
//)
//if err1 != nil {
// return nil, err1
//}
return nil, err
}
}
Expand Down

0 comments on commit ebbc603

Please sign in to comment.