From e86766e17bced24fb627490ef29cc5b7f9616e44 Mon Sep 17 00:00:00 2001 From: ffforest Date: Tue, 17 Dec 2024 18:59:13 +0800 Subject: [PATCH] feat: enhance watch in apply api --- pkg/engine/api/apply.go | 214 +++++------------- pkg/engine/api/preview.go | 1 + pkg/engine/release/util.go | 4 +- .../runtime/kubernetes/kubernetes_runtime.go | 14 +- pkg/server/handler/stack/utils.go | 21 +- pkg/server/manager/stack/execute.go | 41 ++-- pkg/server/manager/stack/types.go | 17 +- pkg/server/manager/stack/util.go | 23 +- pkg/server/util/logging/logger.go | 26 +++ 9 files changed, 136 insertions(+), 225 deletions(-) create mode 100644 pkg/server/util/logging/logger.go diff --git a/pkg/engine/api/apply.go b/pkg/engine/api/apply.go index 35db7b9e..b01b19f8 100644 --- a/pkg/engine/api/apply.go +++ b/pkg/engine/api/apply.go @@ -1,7 +1,6 @@ package api import ( - "bytes" "context" "errors" "fmt" @@ -11,7 +10,6 @@ import ( "sync" "time" - "github.com/liu-hm19/pterm" "gopkg.in/yaml.v2" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" @@ -58,7 +56,8 @@ func Apply( changes *models.Changes, out io.Writer, ) (*apiv1.Release, error) { - logger := logutil.GetLogger(ctx) + sysLogger := logutil.GetLogger(ctx) + runLogger := logutil.GetRunLogger(ctx) var err error // construct the apply operation @@ -79,59 +78,23 @@ func Apply( // line summary var ls lineSummary - // Get the multi printer from UI option. - multi := o.UI.MultiPrinter - // Max length of resource ID for progressbar width. - maxLen := 0 // Prepare the writer to print the operation progress and results. - changesWriterMap := make(map[string]*pterm.SpinnerPrinter) for _, key := range changes.Values() { - // Get the maximum length of the resource ID. - if len(key.ID) > maxLen { - maxLen = len(key.ID) - } - // Init a spinner printer for the resource to print the apply status. - changesWriterMap[key.ID], err = o.UI.SpinnerPrinter. - WithWriter(multi.NewWriter()). - Start(fmt.Sprintf("Pending %s", pterm.Bold.Sprint(key.ID))) - if err != nil { - return nil, fmt.Errorf("failed to init change step spinner printer: %v", err) - } + logutil.LogToAll(sysLogger, runLogger, "Info", fmt.Sprintf("Pending %s", key.ID)) } - // progress bar, print dag walk detail - progressbar, err := pterm.DefaultProgressbar. - WithMaxWidth(0). // Set to 0, the terminal width will be used - WithTotal(len(changes.StepKeys)). - WithWriter(out). - WithRemoveWhenDone(). - Start() - if err != nil { - return nil, err - } - - // The writer below is for operation error printing. - errWriter := multi.NewWriter() - - multi.WithUpdateDelay(time.Millisecond * 100) - multi.Start() - defer multi.Stop() - // wait msgCh close var wg sync.WaitGroup // receive msg and print detail - go PrintApplyDetails( + go ProcessApplyDetails( + ctx, ac, &err, - &errWriter, &wg, changes, - changesWriterMap, - progressbar, &ls, o.DryRun, - o.Watch, gph.Resources, gph, rel, @@ -140,19 +103,19 @@ func Apply( watchErrCh := make(chan error) // Apply while watching the resources. if o.Watch && !o.DryRun { - logger.Info("Start watching resources ...") + logutil.LogToAll(sysLogger, runLogger, "Info", fmt.Sprintf("Start watching resources with timeout %d seconds ...", o.WatchTimeout)) Watch( + ctx, ac, changes, &err, o.DryRun, watchErrCh, - multi, - changesWriterMap, + o.WatchTimeout, gph, rel, ) - logger.Info("Watch completed ...") + logutil.LogToAll(sysLogger, runLogger, "Info", "Watch started ...") } var upRel *apiv1.Release @@ -200,7 +163,7 @@ func Apply( } // print summary - logger.Info(fmt.Sprintf("Apply complete! Resources: %d created, %d updated, %d deleted.", ls.created, ls.updated, ls.deleted)) + logutil.LogToAll(sysLogger, runLogger, "Info", fmt.Sprintf("Apply complete! Resources: %d created, %d updated, %d deleted.", ls.created, ls.updated, ls.deleted)) return upRel, nil } @@ -223,18 +186,17 @@ func Apply( // Watch function will watch the changed Kubernetes and Terraform resources. // Fixme: abstract the input variables into a struct. func Watch( + ctx context.Context, ac *operation.ApplyOperation, changes *models.Changes, err *error, dryRun bool, watchErrCh chan error, - multi *pterm.MultiPrinter, - changesWriterMap map[string]*pterm.SpinnerPrinter, + watchTimeout int, gph *apiv1.Graph, rel *apiv1.Release, ) { resourceMap := make(map[string]apiv1.Resource) - ioWriterMap := make(map[string]io.Writer) toBeWatched := apiv1.Resources{} // Get the resources to be watched. @@ -247,7 +209,6 @@ func Watch( go func() { defer func() { - log.Debug("entering defer func() for watch()") if p := recover(); p != nil { cmdutil.RecoverErr(err) log.Error(*err) @@ -261,6 +222,9 @@ func Watch( } watchErrCh <- *err }() + // Get syslogger + sysLogger := logutil.GetLogger(ctx) + runLogger := logutil.GetRunLogger(ctx) // Init the runtimes according to the resource types. runtimes, s := runtimeinit.Runtimes(*rel.Spec, *rel.State) if v1.IsErr(s) { @@ -268,29 +232,26 @@ func Watch( } // Prepare the tables for printing the details of the resources. - tables := make(map[string]*printers.Table, len(toBeWatched)) - ticker := time.NewTicker(time.Millisecond * 100) + ticker := time.NewTicker(time.Millisecond * 1000) defer ticker.Stop() // Record the watched and finished resources. watchedIDs := []string{} + watching := make(map[string]bool) finished := make(map[string]bool) for !(len(finished) == len(toBeWatched)) { select { // Get the resource ID to be watched. case id := <-ac.WatchCh: - log.Debug("entering case id := <-ac.WatchCh") - log.Debug("id", id) res := resourceMap[id] - log.Debug("res.Type", res.Type) // Set the timeout duration for watch context, here we set an experiential value of 60 minutes. - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(120)) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(watchTimeout)) defer cancel() // Get the event channel for watching the resource. rsp := runtimes[res.Type].Watch(ctx, &runtime.WatchRequest{Resource: &res}) - log.Debug("rsp", rsp) + logutil.LogToAll(sysLogger, runLogger, "Info", fmt.Sprintf("Watching resource rsp: %v", rsp)) if rsp == nil { log.Debug("unsupported resource type: %s", res.Type) continue @@ -300,44 +261,32 @@ func Watch( } w := rsp.Watchers - table := printers.NewTable(w.IDs) - tables[id] = table + watching[id] = false // Setup a go-routine to concurrently watch K8s and TF resources. if res.Type == apiv1.Kubernetes { healthPolicy, kind := getHealthPolicy(&res) - log.Debug("healthPolicyhealthPolicyhealthPolicyhealthPolicyhealthPolicy", healthPolicy) - go watchK8sResources(id, kind, w.Watchers, table, tables, gph, dryRun, healthPolicy, rel) + log.Debug("healthPolicy found: ", healthPolicy) + go watchK8sResources(id, kind, w.Watchers, watching, gph, dryRun, healthPolicy, rel) } else if res.Type == apiv1.Terraform { - go watchTFResources(id, w.TFWatcher, table, dryRun, rel) + go watchTFResources(id, w.TFWatcher, watching, dryRun, rel) } else { log.Debug("unsupported resource type to watch: %s", string(res.Type)) continue } // Record the io writer related to the resource ID. - ioWriterMap[id] = multi.NewWriter() watchedIDs = append(watchedIDs, id) // Refresh the tables printing details of the resources to be watched. default: for _, id := range watchedIDs { - w, ok := ioWriterMap[id] - if !ok { - panic(fmt.Errorf("failed to get io writer while watching %s", id)) - } - printTable(&w, id, tables) + logutil.LogToAll(sysLogger, runLogger, "Info", "watching resource...", "id", id, "timeElapsed", time.Now().String()) } - for id, table := range tables { - if finished[id] { - continue - } - - if table.AllCompleted() { + for id := range watching { + if watching[id] { finished[id] = true - changesWriterMap[id].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(id))) - - // Update resource status to reconciled. + logutil.LogToAll(sysLogger, runLogger, "Info", "resource is reconciled...", "id", id, "timeElapsed", time.Now().String()) resource := graph.FindGraphResourceByID(gph.Resources, id) if resource != nil { resource.Status = apiv1.Reconciled @@ -365,19 +314,16 @@ func (ls *lineSummary) Count(op models.ActionType) { } } -// PrintApplyDetails function will receive the messages of the apply operation and print the details. +// ProcessApplyDetails function will receive the messages of the apply operation and process the details. // Fixme: abstract the input variables into a struct. -func PrintApplyDetails( +func ProcessApplyDetails( + ctx context.Context, ac *operation.ApplyOperation, err *error, - errWriter *io.Writer, wg *sync.WaitGroup, changes *models.Changes, - changesWriterMap map[string]*pterm.SpinnerPrinter, - progressbar *pterm.ProgressbarPrinter, ls *lineSummary, dryRun bool, - watch bool, gphResources *apiv1.GraphResources, gph *apiv1.Graph, rel *apiv1.Release, @@ -394,8 +340,9 @@ func PrintApplyDetails( release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) *err = errors.Join([]error{*err, release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock)}...) } - (*errWriter).(*bytes.Buffer).Reset() }() + sysLogger := logutil.GetLogger(ctx) + runLogger := logutil.GetRunLogger(ctx) wg.Add(1) for { @@ -413,18 +360,10 @@ func PrintApplyDetails( case models.Success, models.Skip: var title string if changeStep.Action == models.UnChanged { - title = fmt.Sprintf("Skipped %s", pterm.Bold.Sprint(changeStep.ID)) - changesWriterMap[msg.ResourceID].Success(title) + title = fmt.Sprintf("Skipped %s", changeStep.ID) + logutil.LogToAll(sysLogger, runLogger, "Info", title) } else { - if watch && !dryRun { - title = fmt.Sprintf("%s %s", - changeStep.Action.Ing(), - pterm.Bold.Sprint(changeStep.ID), - ) - changesWriterMap[msg.ResourceID].UpdateText(title) - } else { - changesWriterMap[msg.ResourceID].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(msg.ResourceID))) - } + logutil.LogToAll(sysLogger, runLogger, "Info", fmt.Sprintf("Succeeded %s", msg.ResourceID)) } // Update resource status @@ -440,13 +379,10 @@ func PrintApplyDetails( } } - progressbar.Increment() ls.Count(changeStep.Action) case models.Failed: - title := fmt.Sprintf("Failed %s", pterm.Bold.Sprint(changeStep.ID)) - changesWriterMap[msg.ResourceID].Fail(title) errStr := pretty.ErrorT.Sprintf("apply %s failed as: %s\n", msg.ResourceID, msg.OpErr.Error()) - pterm.Fprintln(*errWriter, errStr) + logutil.LogToAll(sysLogger, runLogger, "Error", errStr) if !dryRun { // Update resource status, in case anything like update fail happened gphResource := graph.FindGraphResourceByID(gphResources, msg.ResourceID) @@ -457,9 +393,9 @@ func PrintApplyDetails( default: title := fmt.Sprintf("%s %s", changeStep.Action.Ing(), - pterm.Bold.Sprint(changeStep.ID), + changeStep.ID, ) - changesWriterMap[msg.ResourceID].UpdateText(title) + logutil.LogToAll(sysLogger, runLogger, "Info", title) } } } @@ -483,8 +419,7 @@ func getHealthPolicy(res *apiv1.Resource) (healthPolicy interface{}, kind string func watchK8sResources( id, kind string, chs []<-chan watch.Event, - table *printers.Table, - tables map[string]*printers.Table, + watching map[string]bool, gph *apiv1.Graph, dryRun bool, healthPolicy interface{}, @@ -528,51 +463,37 @@ func watchK8sResources( if recvOK { e := recv.Interface().(watch.Event) o := e.Object.(*unstructured.Unstructured) - var detail string + // var detail string var ready bool if e.Type == watch.Deleted { - detail = fmt.Sprintf("%s has beed deleted", o.GetName()) + // detail = fmt.Sprintf("%s has beed deleted", o.GetName()) ready = true } else { // Restore to actual type target := printers.Convert(o) // Check reconcile status with customized health policy for specific resource if healthPolicy != nil && kind == o.GetObjectKind().GroupVersionKind().Kind { - log.Debug("healthPolicy", healthPolicy) if code, ok := kcl.ConvertKCLCode(healthPolicy); ok { resByte, err := yaml.Marshal(o.Object) - log.Debug("kcl health policy code", code) if err != nil { log.Error(err) return } - detail, ready = printers.PrintCustomizedHealthCheck(code, resByte) + _, ready = printers.PrintCustomizedHealthCheck(code, resByte) } else { - detail, ready = printers.Generate(target) + _, ready = printers.Generate(target) } } else { // Check reconcile status with default setup - detail, ready = printers.Generate(target) + _, ready = printers.Generate(target) } } // Mark ready for breaking loop if ready { - e.Type = printers.READY + watching[id] = true + break } - - // Save watched msg - table.Update( - engine.BuildIDForKubernetes(o), - printers.NewRow(e.Type, o.GetKind(), o.GetName(), detail)) - - // Write back - tables[id] = table - } - - // Break when completed - if table.AllCompleted() { - break } } } @@ -580,7 +501,7 @@ func watchK8sResources( func watchTFResources( id string, ch <-chan runtime.TFEvent, - table *printers.Table, + watching map[string]bool, dryRun bool, rel *apiv1.Release, ) { @@ -599,6 +520,7 @@ func watchTFResources( } }() + var ready bool for { parts := strings.Split(id, engine.Separator) // A valid Terraform resource ID should consist of 4 parts, including the information of the provider type @@ -609,24 +531,14 @@ func watchTFResources( tfEvent := <-ch if tfEvent == runtime.TFApplying { - table.Update( - id, - printers.NewRow(watch.EventType("Applying"), - strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Applying...")) - } else if tfEvent == runtime.TFSucceeded { - table.Update( - id, - printers.NewRow(printers.READY, - strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply succeeded")) + continue } else { - table.Update( - id, - printers.NewRow(watch.EventType("Failed"), - strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply failed")) + ready = true } - // Break when all completed. - if table.AllCompleted() { + // Mark ready for breaking loop + if ready { + watching[id] = true break } } @@ -642,23 +554,3 @@ func createSelectCases(chs []<-chan watch.Event) []reflect.SelectCase { } return cases } -func printTable(w *io.Writer, id string, tables map[string]*printers.Table) { - // Reset the buffer for live flushing. - (*w).(*bytes.Buffer).Reset() - - // Print resource Key as heading text - _, _ = fmt.Fprintln(*w, pretty.LightCyanBold("[%s]", id)) - - table, ok := tables[id] - if !ok { - // Unsupported resource, leave a hint - _, _ = fmt.Fprintln(*w, "Skip monitoring unsupported resources") - } else { - // Print table - data := table.Print() - _ = pterm.DefaultTable. - WithStyle(pterm.NewStyle(pterm.FgDefault)). - WithHeaderStyle(pterm.NewStyle(pterm.FgDefault)). - WithHasHeader().WithSeparator(" ").WithData(data).WithWriter(*w).Render() - } -} diff --git a/pkg/engine/api/preview.go b/pkg/engine/api/preview.go index 61f790c7..91e68672 100644 --- a/pkg/engine/api/preview.go +++ b/pkg/engine/api/preview.go @@ -22,6 +22,7 @@ type APIOptions struct { DryRun bool MaxConcurrent int Watch bool + WatchTimeout int UI *terminal.UI } diff --git a/pkg/engine/release/util.go b/pkg/engine/release/util.go index 24ea8d33..d0d077fe 100644 --- a/pkg/engine/release/util.go +++ b/pkg/engine/release/util.go @@ -59,7 +59,7 @@ func NewApplyRelease(storage Storage, project, stack, workspace string) (*v1.Rel if err != nil { return nil, err } - if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed && lastRelease.Phase != v1.ReleasePhaseApplying { + if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed { return nil, fmt.Errorf("cannot create a new release of project: %s, workspace: %s. There is a release:%v in progress", project, workspace, lastRelease.Revision) } @@ -107,7 +107,7 @@ func CreateDestroyRelease(storage Storage, project, stack, workspace string) (*v if err != nil { return nil, err } - if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed && lastRelease.Phase != v1.ReleasePhaseApplying { + if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed { return nil, fmt.Errorf("cannot create release of project %s, workspace %s cause there is release in progress", project, workspace) } diff --git a/pkg/engine/runtime/kubernetes/kubernetes_runtime.go b/pkg/engine/runtime/kubernetes/kubernetes_runtime.go index 958f300e..4d382394 100644 --- a/pkg/engine/runtime/kubernetes/kubernetes_runtime.go +++ b/pkg/engine/runtime/kubernetes/kubernetes_runtime.go @@ -393,7 +393,7 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq amount := 1 // Try to get dependent resource by owner reference dependentGVK := getDependentGVK(reqObj.GroupVersionKind()) - log.Debug("dependentGVK", dependentGVK) + log.Debug("watching dependent GVK: ", dependentGVK) if !dependentGVK.Empty() { owner := reqObj for !dependentGVK.Empty() { @@ -720,12 +720,12 @@ func getDependentGVK(gvk schema.GroupVersionKind) schema.GroupVersionKind { switch gvk.Kind { // Deployment generates ReplicaSet case convertor.Deployment: - return schema.GroupVersionKind{} - // return schema.GroupVersionKind{ - // Group: appsv1.SchemeGroupVersion.Group, - // Version: appsv1.SchemeGroupVersion.Version, - // Kind: convertor.ReplicaSet, - // } + // return schema.GroupVersionKind{} + return schema.GroupVersionKind{ + Group: appsv1.SchemeGroupVersion.Group, + Version: appsv1.SchemeGroupVersion.Version, + Kind: convertor.ReplicaSet, + } // DaemonSet and StatefulSet generate ControllerRevision case convertor.DaemonSet, convertor.StatefulSet: return schema.GroupVersionKind{ diff --git a/pkg/server/handler/stack/utils.go b/pkg/server/handler/stack/utils.go index 2fc05975..eb6d4818 100644 --- a/pkg/server/handler/stack/utils.go +++ b/pkg/server/handler/stack/utils.go @@ -102,6 +102,11 @@ func requestHelper(r *http.Request) (context.Context, *httplog.Logger, *stackman forceParam, _ := strconv.ParseBool(r.URL.Query().Get("force")) noCacheParam, _ := strconv.ParseBool(r.URL.Query().Get("noCache")) unlockParam, _ := strconv.ParseBool(r.URL.Query().Get("unlock")) + watchParam, _ := strconv.ParseBool(r.URL.Query().Get("watch")) + watchTimeoutParam, err := strconv.Atoi(r.URL.Query().Get("watchTimeout")) + if err != nil { + return nil, nil, nil, stackmanager.ErrInvalidWatchTimeout + } importResourcesParam, _ := strconv.ParseBool(r.URL.Query().Get("importResources")) specIDParam := r.URL.Query().Get("specID") // TODO: Should match automatically eventually??? @@ -118,13 +123,15 @@ func requestHelper(r *http.Request) (context.Context, *httplog.Logger, *stackman workspaceParam = constant.DefaultWorkspace } executeParams := stackmanager.StackExecuteParams{ - Detail: detailParam, - Dryrun: dryrunParam, - Force: forceParam, - SpecID: specIDParam, - ImportResources: importResourcesParam, - NoCache: noCacheParam, - Unlock: unlockParam, + Detail: detailParam, + Dryrun: dryrunParam, + Force: forceParam, + SpecID: specIDParam, + ImportResources: importResourcesParam, + NoCache: noCacheParam, + Unlock: unlockParam, + Watch: watchParam, + WatchTimeoutSeconds: watchTimeoutParam, } params := stackmanager.StackRequestParams{ StackID: uint(id), diff --git a/pkg/server/manager/stack/execute.go b/pkg/server/manager/stack/execute.go index 8b002646..5e0c81ad 100644 --- a/pkg/server/manager/stack/execute.go +++ b/pkg/server/manager/stack/execute.go @@ -27,7 +27,7 @@ import ( func (m *StackManager) GenerateSpec(ctx context.Context, params *StackRequestParams) (string, *apiv1.Spec, error) { logger := logutil.GetLogger(ctx) runLogger := logutil.GetRunLogger(ctx) - logToAll(logger, runLogger, "Info", "Starting generating spec in StackManager...") + logutil.LogToAll(logger, runLogger, "Info", "Starting generating spec in StackManager...") // Get the stack entity and return error if stack ID is not found stackEntity, err := m.stackRepo.Get(ctx, params.StackID) @@ -101,7 +101,7 @@ func (m *StackManager) GenerateSpec(ctx context.Context, params *StackRequestPar func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestParams, requestPayload request.StackImportRequest) (*models.Changes, error) { logger := logutil.GetLogger(ctx) runLogger := logutil.GetRunLogger(ctx) - logToAll(logger, runLogger, "Info", "Starting previewing stack in StackManager...") + logutil.LogToAll(logger, runLogger, "Info", "Starting previewing stack in StackManager...") // Get the stack entity by id stackEntity, err := m.stackRepo.Get(ctx, params.StackID) @@ -114,7 +114,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar defer func() { if err != nil { - logToAll(logger, runLogger, "Info", "Error occurred during previewing stack. Setting stack sync state to preview failed") + logutil.LogToAll(logger, runLogger, "Info", "Error occurred during previewing stack. Setting stack sync state to preview failed") stackEntity.SyncState = constant.StackStatePreviewFailed m.stackRepo.Update(ctx, stackEntity) } else { @@ -164,7 +164,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar if err != nil { return nil, err } - logToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath) + logutil.LogToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath) directory, workDir, err := m.GetWorkdirAndDirectory(ctx, params, stackEntity) if err != nil { @@ -188,7 +188,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar // return immediately if no resource found in stack // todo: if there is no resource, should still do diff job; for now, if output is json format, there is no hint if sp == nil || len(sp.Resources) == 0 { - logToAll(logger, runLogger, "Info", "No resource change found in this stack...") + logutil.LogToAll(logger, runLogger, "Info", "No resource change found in this stack...") return nil, nil } @@ -211,7 +211,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar if params.ExecuteParams.ImportResources && len(requestPayload.ImportedResources) > 0 { m.ImportTerraformResourceID(ctx, sp, requestPayload.ImportedResources) } - logToAll(logger, runLogger, "Info", "Final Spec is: ", "spec", sp) + logutil.LogToAll(logger, runLogger, "Info", "Final Spec is: ", "spec", sp) changes, err := engineapi.Preview(executeOptions, releaseStorage, sp, state, project, stack) return changes, err @@ -220,7 +220,7 @@ func (m *StackManager) PreviewStack(ctx context.Context, params *StackRequestPar func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParams, requestPayload request.StackImportRequest) error { logger := logutil.GetLogger(ctx) runLogger := logutil.GetRunLogger(ctx) - logToAll(logger, runLogger, "Info", "Starting applying stack in StackManager ...") + logutil.LogToAll(logger, runLogger, "Info", "Starting applying stack in StackManager ...") _, stackBackend, project, _, ws, err := m.metaHelper(ctx, params.StackID, params.Workspace) if err != nil { return err @@ -239,10 +239,10 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam // If specID is explicitly specified by the caller, use the spec with the specID if params.ExecuteParams.SpecID != "" { specID = params.ExecuteParams.SpecID - logToAll(logger, runLogger, "Info", "SpecID explicitly set. Using the specified version", "SpecID", specID) + logutil.LogToAll(logger, runLogger, "Info", "SpecID explicitly set. Using the specified version", "SpecID", specID) } else { specID = stackEntity.LastPreviewedRevision - logToAll(logger, runLogger, "Info", "SpecID not explicitly set. Using last previewed version", "SpecID", stackEntity.LastPreviewedRevision) + logutil.LogToAll(logger, runLogger, "Info", "SpecID not explicitly set. Using last previewed version", "SpecID", stackEntity.LastPreviewedRevision) } var storage release.Storage @@ -300,7 +300,7 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam if err != nil { return err } - logToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath) + logutil.LogToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath) if err != nil { return err } @@ -340,7 +340,7 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam } executeOptions := BuildOptions(params.ExecuteParams.Dryrun, m.maxConcurrent) - logToAll(logger, runLogger, "Info", "Previewing using the default generator ...") + logutil.LogToAll(logger, runLogger, "Info", "Previewing using the default generator ...") directory, workDir, err := m.GetWorkdirAndDirectory(ctx, params, stackEntity) if err != nil { @@ -364,7 +364,7 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam // return immediately if no resource found in stack // todo: if there is no resource, should still do diff job; for now, if output is json format, there is no hint if sp == nil || len(sp.Resources) == 0 { - logToAll(logger, runLogger, "Info", "No resource change found in this stack...") + logutil.LogToAll(logger, runLogger, "Info", "No resource change found in this stack...") return nil } @@ -377,12 +377,12 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam // if dry run, print the hint if params.ExecuteParams.Dryrun { - logToAll(logger, runLogger, "Info", "Dry-run mode enabled, the above resources will be applied if dryrun is set to false") + logutil.LogToAll(logger, runLogger, "Info", "Dry-run mode enabled, the above resources will be applied if dryrun is set to false") err = ErrDryrunApply return err } - logToAll(logger, runLogger, "Info", "State backend found", "stateBackend", stateBackend) + logutil.LogToAll(logger, runLogger, "Info", "State backend found", "stateBackend", stateBackend) stack.Path = tempPath(stackEntity.Path) // Set context from workspace to spec @@ -404,14 +404,15 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam return err } - logToAll(logger, runLogger, "Info", "Start applying diffs ...") + logutil.LogToAll(logger, runLogger, "Info", "Start applying diffs ...") release.UpdateReleasePhase(rel, apiv1.ReleasePhaseApplying, relLock) if err = release.UpdateApplyRelease(storage, rel, params.ExecuteParams.Dryrun, relLock); err != nil { return err } executeOptions = BuildOptions(params.ExecuteParams.Dryrun, m.maxConcurrent) - executeOptions.Watch = true + executeOptions.Watch = params.ExecuteParams.Watch + executeOptions.WatchTimeout = params.ExecuteParams.WatchTimeoutSeconds // Get graph storage directory, create if not exist graphStorage, err := stackBackend.GraphStorage(project.Name, ws.Name) @@ -465,7 +466,7 @@ func (m *StackManager) ApplyStack(ctx context.Context, params *StackRequestParam func (m *StackManager) DestroyStack(ctx context.Context, params *StackRequestParams, w http.ResponseWriter) (err error) { logger := logutil.GetLogger(ctx) runLogger := logutil.GetRunLogger(ctx) - logToAll(logger, runLogger, "Info", "Starting destroying stack in StackManager ...") + logutil.LogToAll(logger, runLogger, "Info", "Starting destroying stack in StackManager ...") // Get the stack entity by id stackEntity, err := m.stackRepo.Get(ctx, params.StackID) @@ -525,7 +526,7 @@ func (m *StackManager) DestroyStack(ctx context.Context, params *StackRequestPar if err != nil { return err } - logToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath) + logutil.LogToAll(logger, runLogger, "Info", "State storage found with path", "releasePath", releasePath) if err != nil { return err } @@ -564,7 +565,7 @@ func (m *StackManager) DestroyStack(ctx context.Context, params *StackRequestPar // if dryrun, print the hint if params.ExecuteParams.Dryrun { - logToAll(logger, runLogger, "Info", "Dry-run mode enabled, the above resources will be destroyed if dryrun is set to false") + logutil.LogToAll(logger, runLogger, "Info", "Dry-run mode enabled, the above resources will be destroyed if dryrun is set to false") return ErrDryrunDestroy } @@ -574,7 +575,7 @@ func (m *StackManager) DestroyStack(ctx context.Context, params *StackRequestPar return } // Destroy - logToAll(logger, runLogger, "Info", "Start destroying resources......") + logutil.LogToAll(logger, runLogger, "Info", "Start destroying resources......") var upRel *apiv1.Release upRel, err = engineapi.Destroy(executeOptions, rel, changes, storage) diff --git a/pkg/server/manager/stack/types.go b/pkg/server/manager/stack/types.go index 17b97222..a2aceacd 100644 --- a/pkg/server/manager/stack/types.go +++ b/pkg/server/manager/stack/types.go @@ -26,6 +26,7 @@ var ( ErrStackInOperation = errors.New("the stack is being operated by another request. Please wait until it is completed") ErrStackNotPreviewedYet = errors.New("the stack has not been previewed yet. Please generate and preview the stack first") ErrInvalidRunID = errors.New("the run ID should be a uuid") + ErrInvalidWatchTimeout = errors.New("watchTimeout should be a number") ) type StackManager struct { @@ -53,13 +54,15 @@ type StackRequestParams struct { } type StackExecuteParams struct { - Detail bool - Dryrun bool - SpecID string - Force bool - ImportResources bool - NoCache bool - Unlock bool + Detail bool + Dryrun bool + SpecID string + Force bool + ImportResources bool + NoCache bool + Unlock bool + Watch bool + WatchTimeoutSeconds int } type RunRequestParams struct { diff --git a/pkg/server/manager/stack/util.go b/pkg/server/manager/stack/util.go index e02481e1..f238b987 100644 --- a/pkg/server/manager/stack/util.go +++ b/pkg/server/manager/stack/util.go @@ -12,7 +12,6 @@ import ( "strconv" "strings" - "github.com/go-chi/httplog/v2" "gorm.io/gorm" v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" "kusionstack.io/kusion/pkg/backend" @@ -38,7 +37,8 @@ func BuildOptions(dryrun bool, maxConcurrent int) *engineapi.APIOptions { DryRun: dryrun, MaxConcurrent: maxConcurrent, // Watch: false, - UI: terminal.DefaultUI(), + WatchTimeout: 120, + UI: terminal.DefaultUI(), } return executeOptions } @@ -439,25 +439,6 @@ func isInRelease(release *v1.Release, id string) bool { return false } -func logToAll(sysLogger *httplog.Logger, runLogger *httplog.Logger, level string, message string, args ...any) { - switch strings.ToLower(level) { - case "info": - sysLogger.Info(message, args...) - runLogger.Info(message, args...) - case "error": - sysLogger.Error(message, args...) - runLogger.Error(message, args...) - case "warn": - sysLogger.Warn(message, args...) - runLogger.Warn(message, args...) - case "debug": - sysLogger.Debug(message, args...) - runLogger.Debug(message, args...) - default: - sysLogger.Error("unknown log level", "level", level) - } -} - func unlockRelease(ctx context.Context, storage release.Storage) error { logger := logutil.GetLogger(ctx) logger.Info("Getting workdir from stack source...") diff --git a/pkg/server/util/logging/logger.go b/pkg/server/util/logging/logger.go new file mode 100644 index 00000000..c8c57e17 --- /dev/null +++ b/pkg/server/util/logging/logger.go @@ -0,0 +1,26 @@ +package util + +import ( + "strings" + + "github.com/go-chi/httplog/v2" +) + +func LogToAll(sysLogger *httplog.Logger, runLogger *httplog.Logger, level string, message string, args ...any) { + switch strings.ToLower(level) { + case "info": + sysLogger.Info(message, args...) + runLogger.Info(message, args...) + case "error": + sysLogger.Error(message, args...) + runLogger.Error(message, args...) + case "warn": + sysLogger.Warn(message, args...) + runLogger.Warn(message, args...) + case "debug": + sysLogger.Debug(message, args...) + runLogger.Debug(message, args...) + default: + sysLogger.Error("unknown log level", "level", level) + } +}