From b1631eeee227f88617ad12c34202c9178df866da Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 9 Sep 2024 14:55:40 +0200 Subject: [PATCH 1/2] [Wf-Diagnostics] Set query handler for diagnostics workflow to provide result --- service/worker/diagnostics/workflow.go | 31 +++++++++++++-------- service/worker/diagnostics/workflow_test.go | 14 ++++++++++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/service/worker/diagnostics/workflow.go b/service/worker/diagnostics/workflow.go index 096403c2c54..dbd0ab5a90d 100644 --- a/service/worker/diagnostics/workflow.go +++ b/service/worker/diagnostics/workflow.go @@ -33,8 +33,9 @@ import ( ) const ( - diagnosticsWorkflow = "diagnostics-workflow" - tasklist = "wf-diagnostics" + diagnosticsWorkflow = "diagnostics-workflow" + tasklist = "wf-diagnostics" + queryDiagnosticsReport = "query-diagnostics-report" retrieveWfExecutionHistoryActivity = "retrieveWfExecutionHistory" identifyTimeoutsActivity = "identifyTimeouts" @@ -54,6 +55,14 @@ type DiagnosticsWorkflowResult struct { } func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) (DiagnosticsWorkflowResult, error) { + var result DiagnosticsWorkflowResult + err := workflow.SetQueryHandler(ctx, queryDiagnosticsReport, func() (DiagnosticsWorkflowResult, error) { + return result, nil + }) + if err != nil { + return result, err + } + activityOptions := workflow.ActivityOptions{ ScheduleToCloseTimeout: time.Second * 10, ScheduleToStartTimeout: time.Second * 5, @@ -62,24 +71,27 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo activityCtx := workflow.WithActivityOptions(ctx, activityOptions) var wfExecutionHistory *types.GetWorkflowExecutionHistoryResponse - err := workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{ + err = workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{ Domain: params.Domain, Execution: &types.WorkflowExecution{ WorkflowID: params.WorkflowID, RunID: params.RunID, }}).Get(ctx, &wfExecutionHistory) if err != nil { - return DiagnosticsWorkflowResult{}, fmt.Errorf("RetrieveExecutionHistory: %w", err) + return result, fmt.Errorf("RetrieveExecutionHistory: %w", err) } + result.Runbooks = []string{linkToTimeoutsRunbook} + var checkResult []invariants.InvariantCheckResult err = workflow.ExecuteActivity(activityCtx, w.identifyTimeouts, identifyTimeoutsInputParams{ History: wfExecutionHistory, Domain: params.Domain, }).Get(ctx, &checkResult) if err != nil { - return DiagnosticsWorkflowResult{}, fmt.Errorf("IdentifyTimeouts: %w", err) + return result, fmt.Errorf("IdentifyTimeouts: %w", err) } + result.Issues = checkResult var rootCauseResult []invariants.InvariantRootCauseResult err = workflow.ExecuteActivity(activityCtx, w.rootCauseTimeouts, rootCauseTimeoutsParams{ @@ -88,12 +100,9 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo Issues: checkResult, }).Get(ctx, &rootCauseResult) if err != nil { - return DiagnosticsWorkflowResult{}, fmt.Errorf("RootCauseTimeouts: %w", err) + return result, fmt.Errorf("RootCauseTimeouts: %w", err) } + result.RootCause = rootCauseResult - return DiagnosticsWorkflowResult{ - Issues: checkResult, - RootCause: rootCauseResult, - Runbooks: []string{linkToTimeoutsRunbook}, - }, nil + return result, nil } diff --git a/service/worker/diagnostics/workflow_test.go b/service/worker/diagnostics/workflow_test.go index 8d0975370bd..af482ba0ec9 100644 --- a/service/worker/diagnostics/workflow_test.go +++ b/service/worker/diagnostics/workflow_test.go @@ -122,6 +122,10 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow() { s.NoError(s.workflowEnv.GetWorkflowResult(&result)) s.ElementsMatch(issues, result.Issues) s.ElementsMatch(rootCause, result.RootCause) + + queriedResult := s.queryDiagnostics() + s.ElementsMatch(queriedResult.Issues, result.Issues) + s.ElementsMatch(queriedResult.RootCause, result.RootCause) } func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() { @@ -139,3 +143,13 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() { s.Error(s.workflowEnv.GetWorkflowError()) s.EqualError(s.workflowEnv.GetWorkflowError(), errExpected.Error()) } + +func (s *diagnosticsWorkflowTestSuite) queryDiagnostics() DiagnosticsWorkflowResult { + queryFuture, err := s.workflowEnv.QueryWorkflow(queryDiagnosticsReport) + s.NoError(err) + + var result DiagnosticsWorkflowResult + err = queryFuture.Get(&result) + s.NoError(err) + return result +} From 5a5277f5356746b6c684cb00b8c34b4df4f0495f Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Tue, 10 Sep 2024 08:18:05 +0200 Subject: [PATCH 2/2] Update workflow.go --- service/worker/diagnostics/workflow.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/service/worker/diagnostics/workflow.go b/service/worker/diagnostics/workflow.go index dbd0ab5a90d..ebadf6ff964 100644 --- a/service/worker/diagnostics/workflow.go +++ b/service/worker/diagnostics/workflow.go @@ -34,7 +34,7 @@ import ( const ( diagnosticsWorkflow = "diagnostics-workflow" - tasklist = "wf-diagnostics" + tasklist = "diagnostics-wf-tasklist" queryDiagnosticsReport = "query-diagnostics-report" retrieveWfExecutionHistoryActivity = "retrieveWfExecutionHistory" @@ -54,13 +54,13 @@ type DiagnosticsWorkflowResult struct { Runbooks []string } -func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) (DiagnosticsWorkflowResult, error) { +func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) (*DiagnosticsWorkflowResult, error) { var result DiagnosticsWorkflowResult err := workflow.SetQueryHandler(ctx, queryDiagnosticsReport, func() (DiagnosticsWorkflowResult, error) { return result, nil }) if err != nil { - return result, err + return nil, err } activityOptions := workflow.ActivityOptions{ @@ -78,7 +78,7 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo RunID: params.RunID, }}).Get(ctx, &wfExecutionHistory) if err != nil { - return result, fmt.Errorf("RetrieveExecutionHistory: %w", err) + return nil, fmt.Errorf("RetrieveExecutionHistory: %w", err) } result.Runbooks = []string{linkToTimeoutsRunbook} @@ -89,7 +89,7 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo Domain: params.Domain, }).Get(ctx, &checkResult) if err != nil { - return result, fmt.Errorf("IdentifyTimeouts: %w", err) + return nil, fmt.Errorf("IdentifyTimeouts: %w", err) } result.Issues = checkResult @@ -100,9 +100,9 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo Issues: checkResult, }).Get(ctx, &rootCauseResult) if err != nil { - return result, fmt.Errorf("RootCauseTimeouts: %w", err) + return nil, fmt.Errorf("RootCauseTimeouts: %w", err) } result.RootCause = rootCauseResult - return result, nil + return &result, nil }