Skip to content

Commit

Permalink
feat: workflow status listener
Browse files Browse the repository at this point in the history
Signed-off-by: Gosha <[email protected]>
  • Loading branch information
gosharo committed Jul 6, 2023
1 parent 8993328 commit 7260d96
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 6 deletions.
40 changes: 34 additions & 6 deletions cmd/piper/piper.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package main

import (
"context"
"fmt"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"k8s.io/apimachinery/pkg/watch"
"log"
"strconv"

rookout "github.com/Rookout/GoSDK"
"github.com/rookout/piper/pkg/clients"
"github.com/rookout/piper/pkg/conf"
"github.com/rookout/piper/pkg/git_provider"
"github.com/rookout/piper/pkg/server"
"github.com/rookout/piper/pkg/utils"
workflowHandler "github.com/rookout/piper/pkg/workflow_handler"

rookout "github.com/Rookout/GoSDK"
)

func main() {
Expand Down Expand Up @@ -51,10 +55,34 @@ func main() {
panic(err)
}

//err = common.Git.UnsetWebhook()
//if err != nil {
// panic(err)
//}
ctx := context.Background()
watcher, err := globalClients.Workflows.Watch(&ctx)
if err != nil {
log.Panicf("Failed to watch workflow error:%s", err)
}
defer watcher.Stop()

go func() {
workflowEventHandler(watcher.ResultChan())
}()

server.Start(cfg, globalClients)
}

func workflowEventHandler(workflowChan <-chan watch.Event) {
for event := range workflowChan {
wf, ok := event.Object.(*v1alpha1.Workflow)
if !ok {
log.Printf("Workflow object is not a v1alpha1.Workflow")
return
}
fmt.Printf(
"evnet are: %s, %s phase: %s completed: %s, message: %s\n",
event.Type,
wf.GetName(),
wf.Status.Phase,
strconv.FormatBool(wf.Status.Phase.Completed()),
wf.Status.Message,
)
}
}
2 changes: 2 additions & 0 deletions pkg/workflow_handler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/rookout/piper/pkg/common"
"k8s.io/apimachinery/pkg/watch"
)

type WorkflowsClient interface {
Expand All @@ -14,4 +15,5 @@ type WorkflowsClient interface {
Lint(wf *v1alpha1.Workflow) error
Submit(ctx *context.Context, wf *v1alpha1.Workflow) error
HandleWorkflowBatch(ctx *context.Context, workflowsBatch *common.WorkflowsBatch) error
Watch(ctx *context.Context) (watch.Interface, error)
}
18 changes: 18 additions & 0 deletions pkg/workflow_handler/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfClientSet "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"log"

"github.com/rookout/piper/pkg/common"
Expand Down Expand Up @@ -93,6 +95,7 @@ func (wfc *WorkflowsClientImpl) CreateWorkflow(spec *v1alpha1.WorkflowSpec, work
GenerateName: ConvertToValidString(workflowsBatch.Payload.Repo + "-" + workflowsBatch.Payload.Branch + "-"),
Namespace: wfc.cfg.Namespace,
Labels: map[string]string{
"piper": "true",
"repo": ConvertToValidString(workflowsBatch.Payload.Repo),
"branch": ConvertToValidString(workflowsBatch.Payload.Branch),
"user": ConvertToValidString(workflowsBatch.Payload.User),
Expand Down Expand Up @@ -144,6 +147,7 @@ func (wfc *WorkflowsClientImpl) Submit(ctx *context.Context, wf *v1alpha1.Workfl
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -196,3 +200,17 @@ func (wfc *WorkflowsClientImpl) HandleWorkflowBatch(ctx *context.Context, workfl
log.Printf("submit workflow for branch %s repo %s commit %s", workflowsBatch.Payload.Branch, workflowsBatch.Payload.Repo, workflowsBatch.Payload.Commit)
return nil
}

func (wfc *WorkflowsClientImpl) Watch(ctx *context.Context) (watch.Interface, error) {
workflowsClient := wfc.clientSet.ArgoprojV1alpha1().Workflows(wfc.cfg.Namespace)
opts := v1.ListOptions{
Watch: true,
LabelSelector: "piper=true",
}
watcher, err := workflowsClient.Watch(*ctx, opts)
if err != nil {
return nil, err
}

return watcher, nil
}
1 change: 1 addition & 0 deletions pkg/workflow_handler/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func TestCreateWorkflow(t *testing.T) {
assert.Equal("my-repo-my-branch-", workflow.ObjectMeta.GenerateName)
assert.Equal(wfcImpl.cfg.Namespace, workflow.ObjectMeta.Namespace)
assert.Equal(map[string]string{
"piper": "true",
"repo": "my-repo",
"branch": "my-branch",
"user": "my-user",
Expand Down

0 comments on commit 7260d96

Please sign in to comment.