Skip to content

Commit

Permalink
[Wf-Diagnostics] Set query handler for diagnostics workflow to provid…
Browse files Browse the repository at this point in the history
…e result (#6273)

* [Wf-Diagnostics] Set query handler for diagnostics workflow to provide result

* Update workflow.go
  • Loading branch information
sankari165 authored Sep 10, 2024
1 parent ed68c03 commit 1771349
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
33 changes: 21 additions & 12 deletions service/worker/diagnostics/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
)

const (
diagnosticsWorkflow = "diagnostics-workflow"
tasklist = "wf-diagnostics"
diagnosticsWorkflow = "diagnostics-workflow"
tasklist = "diagnostics-wf-tasklist"
queryDiagnosticsReport = "query-diagnostics-report"

retrieveWfExecutionHistoryActivity = "retrieveWfExecutionHistory"
identifyTimeoutsActivity = "identifyTimeouts"
Expand All @@ -53,7 +54,15 @@ type DiagnosticsWorkflowResult struct {
Runbooks []string
}

func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) (DiagnosticsWorkflowResult, error) {
func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflowInput) (*DiagnosticsWorkflowResult, error) {
var result DiagnosticsWorkflowResult
err := workflow.SetQueryHandler(ctx, queryDiagnosticsReport, func() (DiagnosticsWorkflowResult, error) {
return result, nil
})
if err != nil {
return nil, err
}

activityOptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: time.Second * 10,
ScheduleToStartTimeout: time.Second * 5,
Expand All @@ -62,24 +71,27 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo
activityCtx := workflow.WithActivityOptions(ctx, activityOptions)

var wfExecutionHistory *types.GetWorkflowExecutionHistoryResponse
err := workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{
err = workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{
Domain: params.Domain,
Execution: &types.WorkflowExecution{
WorkflowID: params.WorkflowID,
RunID: params.RunID,
}}).Get(ctx, &wfExecutionHistory)
if err != nil {
return DiagnosticsWorkflowResult{}, fmt.Errorf("RetrieveExecutionHistory: %w", err)
return nil, fmt.Errorf("RetrieveExecutionHistory: %w", err)
}

result.Runbooks = []string{linkToTimeoutsRunbook}

var checkResult []invariants.InvariantCheckResult
err = workflow.ExecuteActivity(activityCtx, w.identifyTimeouts, identifyTimeoutsInputParams{
History: wfExecutionHistory,
Domain: params.Domain,
}).Get(ctx, &checkResult)
if err != nil {
return DiagnosticsWorkflowResult{}, fmt.Errorf("IdentifyTimeouts: %w", err)
return nil, fmt.Errorf("IdentifyTimeouts: %w", err)
}
result.Issues = checkResult

var rootCauseResult []invariants.InvariantRootCauseResult
err = workflow.ExecuteActivity(activityCtx, w.rootCauseTimeouts, rootCauseTimeoutsParams{
Expand All @@ -88,12 +100,9 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo
Issues: checkResult,
}).Get(ctx, &rootCauseResult)
if err != nil {
return DiagnosticsWorkflowResult{}, fmt.Errorf("RootCauseTimeouts: %w", err)
return nil, fmt.Errorf("RootCauseTimeouts: %w", err)
}
result.RootCause = rootCauseResult

return DiagnosticsWorkflowResult{
Issues: checkResult,
RootCause: rootCauseResult,
Runbooks: []string{linkToTimeoutsRunbook},
}, nil
return &result, nil
}
14 changes: 14 additions & 0 deletions service/worker/diagnostics/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow() {
s.NoError(s.workflowEnv.GetWorkflowResult(&result))
s.ElementsMatch(issues, result.Issues)
s.ElementsMatch(rootCause, result.RootCause)

queriedResult := s.queryDiagnostics()
s.ElementsMatch(queriedResult.Issues, result.Issues)
s.ElementsMatch(queriedResult.RootCause, result.RootCause)
}

func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() {
Expand All @@ -139,3 +143,13 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() {
s.Error(s.workflowEnv.GetWorkflowError())
s.EqualError(s.workflowEnv.GetWorkflowError(), errExpected.Error())
}

func (s *diagnosticsWorkflowTestSuite) queryDiagnostics() DiagnosticsWorkflowResult {
queryFuture, err := s.workflowEnv.QueryWorkflow(queryDiagnosticsReport)
s.NoError(err)

var result DiagnosticsWorkflowResult
err = queryFuture.Get(&result)
s.NoError(err)
return result
}

0 comments on commit 1771349

Please sign in to comment.