Skip to content

Commit

Permalink
feat: enhance watch in apply api
Browse files Browse the repository at this point in the history
  • Loading branch information
ffforest committed Dec 17, 2024
1 parent b6198a6 commit e86766e
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 225 deletions.
214 changes: 53 additions & 161 deletions pkg/engine/api/apply.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/engine/api/preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type APIOptions struct {
DryRun bool
MaxConcurrent int
Watch bool
WatchTimeout int
UI *terminal.UI
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/engine/release/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewApplyRelease(storage Storage, project, stack, workspace string) (*v1.Rel
if err != nil {
return nil, err
}
if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed && lastRelease.Phase != v1.ReleasePhaseApplying {
if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed {
return nil, fmt.Errorf("cannot create a new release of project: %s, workspace: %s. There is a release:%v in progress",
project, workspace, lastRelease.Revision)
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func CreateDestroyRelease(storage Storage, project, stack, workspace string) (*v
if err != nil {
return nil, err
}
if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed && lastRelease.Phase != v1.ReleasePhaseApplying {
if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed {
return nil, fmt.Errorf("cannot create release of project %s, workspace %s cause there is release in progress", project, workspace)
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/engine/runtime/kubernetes/kubernetes_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq
amount := 1
// Try to get dependent resource by owner reference
dependentGVK := getDependentGVK(reqObj.GroupVersionKind())
log.Debug("dependentGVK", dependentGVK)
log.Debug("watching dependent GVK: ", dependentGVK)
if !dependentGVK.Empty() {
owner := reqObj
for !dependentGVK.Empty() {
Expand Down Expand Up @@ -720,12 +720,12 @@ func getDependentGVK(gvk schema.GroupVersionKind) schema.GroupVersionKind {
switch gvk.Kind {
// Deployment generates ReplicaSet
case convertor.Deployment:
return schema.GroupVersionKind{}
// return schema.GroupVersionKind{
// Group: appsv1.SchemeGroupVersion.Group,
// Version: appsv1.SchemeGroupVersion.Version,
// Kind: convertor.ReplicaSet,
// }
// return schema.GroupVersionKind{}
return schema.GroupVersionKind{
Group: appsv1.SchemeGroupVersion.Group,
Version: appsv1.SchemeGroupVersion.Version,
Kind: convertor.ReplicaSet,
}
// DaemonSet and StatefulSet generate ControllerRevision
case convertor.DaemonSet, convertor.StatefulSet:
return schema.GroupVersionKind{
Expand Down
21 changes: 14 additions & 7 deletions pkg/server/handler/stack/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func requestHelper(r *http.Request) (context.Context, *httplog.Logger, *stackman
forceParam, _ := strconv.ParseBool(r.URL.Query().Get("force"))
noCacheParam, _ := strconv.ParseBool(r.URL.Query().Get("noCache"))
unlockParam, _ := strconv.ParseBool(r.URL.Query().Get("unlock"))
watchParam, _ := strconv.ParseBool(r.URL.Query().Get("watch"))
watchTimeoutParam, err := strconv.Atoi(r.URL.Query().Get("watchTimeout"))
if err != nil {
return nil, nil, nil, stackmanager.ErrInvalidWatchTimeout
}
importResourcesParam, _ := strconv.ParseBool(r.URL.Query().Get("importResources"))
specIDParam := r.URL.Query().Get("specID")
// TODO: Should match automatically eventually???
Expand All @@ -118,13 +123,15 @@ func requestHelper(r *http.Request) (context.Context, *httplog.Logger, *stackman
workspaceParam = constant.DefaultWorkspace
}
executeParams := stackmanager.StackExecuteParams{
Detail: detailParam,
Dryrun: dryrunParam,
Force: forceParam,
SpecID: specIDParam,
ImportResources: importResourcesParam,
NoCache: noCacheParam,
Unlock: unlockParam,
Detail: detailParam,
Dryrun: dryrunParam,
Force: forceParam,
SpecID: specIDParam,
ImportResources: importResourcesParam,
NoCache: noCacheParam,
Unlock: unlockParam,
Watch: watchParam,
WatchTimeoutSeconds: watchTimeoutParam,
}
params := stackmanager.StackRequestParams{
StackID: uint(id),
Expand Down
41 changes: 21 additions & 20 deletions pkg/server/manager/stack/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func (m *StackManager) GenerateSpec(ctx context.Context, params *StackRequestParams) (string, *apiv1.Spec, error) {
logger := logutil.GetLogger(ctx)
runLogger := logutil.GetRunLogger(ctx)
logToAll(logger, runLogger, "Info", "Starting generating spec in StackManager...")
logutil.LogToAll(logger, runLogger, "Info", "Starting generating spec in StackManager...")

// Get the stack entity and return error if stack ID is not found
stackEntity, err := m.stackRepo.Get(ctx, params.StackID)
Expand Down Expand Up @@ -101,7 +101,7 @@ func (m *StackManager) GenerateSpec(ctx context.Context, params *StackRequestPar
func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestParams, requestPayload request.StackImportRequest) (*models.Changes, error) {
logger := logutil.GetLogger(ctx)
runLogger := logutil.GetRunLogger(ctx)
logToAll(logger, runLogger, "Info", "Starting previewing stack in StackManager...")
logutil.LogToAll(logger, runLogger, "Info", "Starting previewing stack in StackManager...")

// Get the stack entity by id
stackEntity, err := m.stackRepo.Get(ctx, params.StackID)
Expand All @@ -114,7 +114,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar

defer func() {
if err != nil {
logToAll(logger, runLogger, "Info", "Error occurred during previewing stack. Setting stack sync state to preview failed")
logutil.LogToAll(logger, runLogger, "Info", "Error occurred during previewing stack. Setting stack sync state to preview failed")
stackEntity.SyncState = constant.StackStatePreviewFailed
m.stackRepo.Update(ctx, stackEntity)
} else {
Expand Down Expand Up @@ -164,7 +164,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar
if err != nil {
return nil, err
}
logToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath)
logutil.LogToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath)

directory, workDir, err := m.GetWorkdirAndDirectory(ctx, params, stackEntity)
if err != nil {
Expand All @@ -188,7 +188,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar
// return immediately if no resource found in stack
// todo: if there is no resource, should still do diff job; for now, if output is json format, there is no hint
if sp == nil || len(sp.Resources) == 0 {
logToAll(logger, runLogger, "Info", "No resource change found in this stack...")
logutil.LogToAll(logger, runLogger, "Info", "No resource change found in this stack...")
return nil, nil
}

Expand All @@ -211,7 +211,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar
if params.ExecuteParams.ImportResources && len(requestPayload.ImportedResources) > 0 {
m.ImportTerraformResourceID(ctx, sp, requestPayload.ImportedResources)
}
logToAll(logger, runLogger, "Info", "Final Spec is: ", "spec", sp)
logutil.LogToAll(logger, runLogger, "Info", "Final Spec is: ", "spec", sp)

changes, err := engineapi.Preview(executeOptions, releaseStorage, sp, state, project, stack)
return changes, err
Expand All @@ -220,7 +220,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar
func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParams, requestPayload request.StackImportRequest) error {
logger := logutil.GetLogger(ctx)
runLogger := logutil.GetRunLogger(ctx)
logToAll(logger, runLogger, "Info", "Starting applying stack in StackManager ...")
logutil.LogToAll(logger, runLogger, "Info", "Starting applying stack in StackManager ...")
_, stackBackend, project, _, ws, err := m.metaHelper(ctx, params.StackID, params.Workspace)
if err != nil {
return err
Expand All @@ -239,10 +239,10 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam
// If specID is explicitly specified by the caller, use the spec with the specID
if params.ExecuteParams.SpecID != "" {
specID = params.ExecuteParams.SpecID
logToAll(logger, runLogger, "Info", "SpecID explicitly set. Using the specified version", "SpecID", specID)
logutil.LogToAll(logger, runLogger, "Info", "SpecID explicitly set. Using the specified version", "SpecID", specID)
} else {
specID = stackEntity.LastPreviewedRevision
logToAll(logger, runLogger, "Info", "SpecID not explicitly set. Using last previewed version", "SpecID", stackEntity.LastPreviewedRevision)
logutil.LogToAll(logger, runLogger, "Info", "SpecID not explicitly set. Using last previewed version", "SpecID", stackEntity.LastPreviewedRevision)
}

var storage release.Storage
Expand Down Expand Up @@ -300,7 +300,7 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam
if err != nil {
return err
}
logToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath)
logutil.LogToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath)
if err != nil {
return err
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam
}
executeOptions := BuildOptions(params.ExecuteParams.Dryrun, m.maxConcurrent)

logToAll(logger, runLogger, "Info", "Previewing using the default generator ...")
logutil.LogToAll(logger, runLogger, "Info", "Previewing using the default generator ...")

directory, workDir, err := m.GetWorkdirAndDirectory(ctx, params, stackEntity)
if err != nil {
Expand All @@ -364,7 +364,7 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam
// return immediately if no resource found in stack
// todo: if there is no resource, should still do diff job; for now, if output is json format, there is no hint
if sp == nil || len(sp.Resources) == 0 {
logToAll(logger, runLogger, "Info", "No resource change found in this stack...")
logutil.LogToAll(logger, runLogger, "Info", "No resource change found in this stack...")
return nil
}

Expand All @@ -377,12 +377,12 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam

// if dry run, print the hint
if params.ExecuteParams.Dryrun {
logToAll(logger, runLogger, "Info", "Dry-run mode enabled, the above resources will be applied if dryrun is set to false")
logutil.LogToAll(logger, runLogger, "Info", "Dry-run mode enabled, the above resources will be applied if dryrun is set to false")
err = ErrDryrunApply
return err
}

logToAll(logger, runLogger, "Info", "State backend found", "stateBackend", stateBackend)
logutil.LogToAll(logger, runLogger, "Info", "State backend found", "stateBackend", stateBackend)
stack.Path = tempPath(stackEntity.Path)

// Set context from workspace to spec
Expand All @@ -404,14 +404,15 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam
return err
}

logToAll(logger, runLogger, "Info", "Start applying diffs ...")
logutil.LogToAll(logger, runLogger, "Info", "Start applying diffs ...")
release.UpdateReleasePhase(rel, apiv1.ReleasePhaseApplying, relLock)
if err = release.UpdateApplyRelease(storage, rel, params.ExecuteParams.Dryrun, relLock); err != nil {
return err
}

executeOptions = BuildOptions(params.ExecuteParams.Dryrun, m.maxConcurrent)
executeOptions.Watch = true
executeOptions.Watch = params.ExecuteParams.Watch
executeOptions.WatchTimeout = params.ExecuteParams.WatchTimeoutSeconds

// Get graph storage directory, create if not exist
graphStorage, err := stackBackend.GraphStorage(project.Name, ws.Name)
Expand Down Expand Up @@ -465,7 +466,7 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam
func (m *StackManager) DestroyStack(ctx context.Context, params *StackRequestParams, w http.ResponseWriter) (err error) {
logger := logutil.GetLogger(ctx)
runLogger := logutil.GetRunLogger(ctx)
logToAll(logger, runLogger, "Info", "Starting destroying stack in StackManager ...")
logutil.LogToAll(logger, runLogger, "Info", "Starting destroying stack in StackManager ...")

// Get the stack entity by id
stackEntity, err := m.stackRepo.Get(ctx, params.StackID)
Expand Down Expand Up @@ -525,7 +526,7 @@ func (m *StackManager) DestroyStack(ctx context.Context, params *StackRequestPar
if err != nil {
return err
}
logToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath)
logutil.LogToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath)
if err != nil {
return err
}
Expand Down Expand Up @@ -564,7 +565,7 @@ func (m *StackManager) DestroyStack(ctx context.Context, params *StackRequestPar

// if dryrun, print the hint
if params.ExecuteParams.Dryrun {
logToAll(logger, runLogger, "Info", "Dry-run mode enabled, the above resources will be destroyed if dryrun is set to false")
logutil.LogToAll(logger, runLogger, "Info", "Dry-run mode enabled, the above resources will be destroyed if dryrun is set to false")
return ErrDryrunDestroy
}

Expand All @@ -574,7 +575,7 @@ func (m *StackManager) DestroyStack(ctx context.Context, params *StackRequestPar
return
}
// Destroy
logToAll(logger, runLogger, "Info", "Start destroying resources......")
logutil.LogToAll(logger, runLogger, "Info", "Start destroying resources......")
var upRel *apiv1.Release

upRel, err = engineapi.Destroy(executeOptions, rel, changes, storage)
Expand Down
17 changes: 10 additions & 7 deletions pkg/server/manager/stack/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
ErrStackInOperation = errors.New("the stack is being operated by another request. Please wait until it is completed")
ErrStackNotPreviewedYet = errors.New("the stack has not been previewed yet. Please generate and preview the stack first")
ErrInvalidRunID = errors.New("the run ID should be a uuid")
ErrInvalidWatchTimeout = errors.New("watchTimeout should be a number")
)

type StackManager struct {
Expand Down Expand Up @@ -53,13 +54,15 @@ type StackRequestParams struct {
}

type StackExecuteParams struct {
Detail bool
Dryrun bool
SpecID string
Force bool
ImportResources bool
NoCache bool
Unlock bool
Detail bool
Dryrun bool
SpecID string
Force bool
ImportResources bool
NoCache bool
Unlock bool
Watch bool
WatchTimeoutSeconds int
}

type RunRequestParams struct {
Expand Down
23 changes: 2 additions & 21 deletions pkg/server/manager/stack/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strconv"
"strings"

"github.com/go-chi/httplog/v2"
"gorm.io/gorm"
v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
"kusionstack.io/kusion/pkg/backend"
Expand All @@ -38,7 +37,8 @@ func BuildOptions(dryrun bool, maxConcurrent int) *engineapi.APIOptions {
DryRun: dryrun,
MaxConcurrent: maxConcurrent,
// Watch: false,
UI: terminal.DefaultUI(),
WatchTimeout: 120,
UI: terminal.DefaultUI(),
}
return executeOptions
}
Expand Down Expand Up @@ -439,25 +439,6 @@ func isInRelease(release *v1.Release, id string) bool {
return false
}

func logToAll(sysLogger *httplog.Logger, runLogger *httplog.Logger, level string, message string, args ...any) {
switch strings.ToLower(level) {
case "info":
sysLogger.Info(message, args...)
runLogger.Info(message, args...)
case "error":
sysLogger.Error(message, args...)
runLogger.Error(message, args...)
case "warn":
sysLogger.Warn(message, args...)
runLogger.Warn(message, args...)
case "debug":
sysLogger.Debug(message, args...)
runLogger.Debug(message, args...)
default:
sysLogger.Error("unknown log level", "level", level)
}
}

func unlockRelease(ctx context.Context, storage release.Storage) error {
logger := logutil.GetLogger(ctx)
logger.Info("Getting workdir from stack source...")
Expand Down
26 changes: 26 additions & 0 deletions pkg/server/util/logging/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package util

import (
"strings"

"github.com/go-chi/httplog/v2"
)

func LogToAll(sysLogger *httplog.Logger, runLogger *httplog.Logger, level string, message string, args ...any) {
switch strings.ToLower(level) {
case "info":
sysLogger.Info(message, args...)
runLogger.Info(message, args...)
case "error":
sysLogger.Error(message, args...)
runLogger.Error(message, args...)
case "warn":
sysLogger.Warn(message, args...)
runLogger.Warn(message, args...)
case "debug":
sysLogger.Debug(message, args...)
runLogger.Debug(message, args...)
default:
sysLogger.Error("unknown log level", "level", level)
}
}

0 comments on commit e86766e

Please sign in to comment.