diff --git a/.gitignore b/.gitignore
index 2644e0156..e5b2cc3f7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,4 +20,4 @@ tmp/*
.idea
# Directory for local development
-local/
+.local/
diff --git a/Makefile b/Makefile
index 7de68d337..ed746c50a 100644
--- a/Makefile
+++ b/Makefile
@@ -14,7 +14,7 @@ VERSION=
SCRIPT_DIR=$(abspath $(dir $(lastword $(MAKEFILE_LIST))))
# Directories for miscellaneous files for the local environment
-LOCAL_DIR=$(SCRIPT_DIR)/local
+LOCAL_DIR=$(SCRIPT_DIR)/.local
LOCAL_BIN_DIR=$(LOCAL_DIR)/bin
# Configuration directory
@@ -111,6 +111,7 @@ run-server-https: ${SERVER_CERT_FILE} ${SERVER_KEY_FILE}
test:
@echo "${COLOR_GREEN}Running tests...${COLOR_RESET}"
@GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum}
+ @go clean -testcache
@${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} ./...
# test-coverage runs all tests with coverage.
@@ -119,13 +120,6 @@ test-coverage:
@GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum}
@${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} -coverprofile="coverage.txt" -covermode=atomic ./...
-# test-clean cleans the test cache and run all tests.
-test-clean: build-bin
- @echo "${COLOR_GREEN}Running tests...${COLOR_RESET}"
- @GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum}
- @go clean -testcache
- @${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} ./...
-
# lint runs the linter.
lint: golangci-lint
diff --git a/cmd/dry.go b/cmd/dry.go
index 7bbdc5a59..7b71d24ce 100644
--- a/cmd/dry.go
+++ b/cmd/dry.go
@@ -17,7 +17,6 @@ package cmd
import (
"log"
- "os"
"path/filepath"
"github.com/daguflow/dagu/internal/agent"
@@ -45,32 +44,35 @@ func dryCmd() *cobra.Command {
params, err := cmd.Flags().GetString("params")
if err != nil {
- initLogger.Error("Parameter retrieval failed", "error", err)
- os.Exit(1)
+ initLogger.Fatal("Parameter retrieval failed", "error", err)
}
workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params))
if err != nil {
- initLogger.Error("Workflow load failed", "error", err, "file", args[0])
- os.Exit(1)
+ initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}
requestID, err := generateRequestID()
if err != nil {
- initLogger.Error("Request ID generation failed", "error", err)
- os.Exit(1)
+ initLogger.Fatal("Request ID generation failed", "error", err)
}
- logFile, err := openLogFile("dry_", cfg.LogDir, workflow, requestID)
+ logFile, err := logger.OpenLogFile(logger.LogFileConfig{
+ Prefix: "dry_",
+ LogDir: cfg.LogDir,
+ DAGLogDir: workflow.LogDir,
+ DAGName: workflow.Name,
+ RequestID: requestID,
+ })
+
if err != nil {
- initLogger.Error(
+ initLogger.Fatal(
"Log file creation failed",
"error",
err,
"workflow",
workflow.Name,
)
- os.Exit(1)
}
defer logFile.Close()
@@ -98,11 +100,10 @@ func dryCmd() *cobra.Command {
listenSignals(ctx, agt)
if err := agt.Run(ctx); err != nil {
- agentLogger.Error("Workflow execution failed",
+ agentLogger.Fatal("Workflow execution failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
- os.Exit(1)
}
},
}
diff --git a/cmd/logging.go b/cmd/logging.go
deleted file mode 100644
index 517e0303a..000000000
--- a/cmd/logging.go
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright (C) 2024 The Daguflow/Dagu Authors
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see .
-
-package cmd
-
-import (
- "fmt"
- "os"
- "path/filepath"
- "time"
-
- "github.com/daguflow/dagu/internal/dag"
- "github.com/daguflow/dagu/internal/util"
-)
-
-// openLogFile opens a log file for the workflow.
-func openLogFile(
- prefix string,
- logDir string,
- workflow *dag.DAG,
- requestID string,
-) (*os.File, error) {
- name := util.ValidFilename(workflow.Name)
- if workflow.LogDir != "" {
- logDir = filepath.Join(workflow.LogDir, name)
- }
- // Check if the log directory exists
- if _, err := os.Stat(logDir); os.IsNotExist(err) {
- // Create the log directory
- if err := os.MkdirAll(logDir, 0755); err != nil {
- return nil, err
- }
- }
- file := filepath.Join(logDir, fmt.Sprintf("%s%s.%s.%s.log",
- prefix,
- name,
- time.Now().Format("20060102.15:04:05.000"),
- util.TruncString(requestID, 8),
- ))
- // Open or create the log file
- return os.OpenFile(
- file, os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0644,
- )
-}
diff --git a/cmd/restart.go b/cmd/restart.go
index 640f215be..ad06a3917 100644
--- a/cmd/restart.go
+++ b/cmd/restart.go
@@ -17,7 +17,6 @@ package cmd
import (
"log"
- "os"
"path/filepath"
"time"
@@ -57,18 +56,16 @@ func restartCmd() *cobra.Command {
specFilePath := args[0]
workflow, err := dag.Load(cfg.BaseConfig, specFilePath, "")
if err != nil {
- initLogger.Error("Workflow load failed", "error", err, "file", args[0])
- os.Exit(1)
+ initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}
dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore, initLogger)
if err := stopDAGIfRunning(cli, workflow, initLogger); err != nil {
- initLogger.Error("Workflow stop operation failed",
+ initLogger.Fatal("Workflow stop operation failed",
"error", err,
"workflow", workflow.Name)
- os.Exit(1)
}
// Wait for the specified amount of time before restarting.
@@ -77,35 +74,37 @@ func restartCmd() *cobra.Command {
// Retrieve the parameter of the previous execution.
params, err := getPreviousExecutionParams(cli, workflow)
if err != nil {
- initLogger.Error("Previous execution parameter retrieval failed",
+ initLogger.Fatal("Previous execution parameter retrieval failed",
"error", err,
"workflow", workflow.Name)
- os.Exit(1)
}
// Start the DAG with the same parameter.
// Need to reload the DAG file with the parameter.
workflow, err = dag.Load(cfg.BaseConfig, specFilePath, params)
if err != nil {
- initLogger.Error("Workflow reload failed",
+ initLogger.Fatal("Workflow reload failed",
"error", err,
"file", specFilePath,
"params", params)
- os.Exit(1)
}
requestID, err := generateRequestID()
if err != nil {
- initLogger.Error("Request ID generation failed", "error", err)
- os.Exit(1)
+ initLogger.Fatal("Request ID generation failed", "error", err)
}
- logFile, err := openLogFile("restart_", cfg.LogDir, workflow, requestID)
+ logFile, err := logger.OpenLogFile(logger.LogFileConfig{
+ Prefix: "restart_",
+ LogDir: cfg.LogDir,
+ DAGLogDir: workflow.LogDir,
+ DAGName: workflow.Name,
+ RequestID: requestID,
+ })
if err != nil {
- initLogger.Error("Log file creation failed",
+ initLogger.Fatal("Log file creation failed",
"error", err,
"workflow", workflow.Name)
- os.Exit(1)
}
defer logFile.Close()
@@ -133,11 +132,10 @@ func restartCmd() *cobra.Command {
listenSignals(cmd.Context(), agt)
if err := agt.Run(cmd.Context()); err != nil {
- agentLogger.Error("Workflow restart failed",
+ agentLogger.Fatal("Workflow restart failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
- os.Exit(1)
}
},
}
diff --git a/cmd/retry.go b/cmd/retry.go
index 99646c963..583cc00fa 100644
--- a/cmd/retry.go
+++ b/cmd/retry.go
@@ -56,44 +56,45 @@ func retryCmd() *cobra.Command {
specFilePath := args[0]
absoluteFilePath, err := filepath.Abs(specFilePath)
if err != nil {
- initLogger.Error("Absolute path resolution failed",
+ initLogger.Fatal("Absolute path resolution failed",
"error", err,
"file", specFilePath)
- os.Exit(1)
}
status, err := historyStore.FindByRequestID(absoluteFilePath, requestID)
if err != nil {
- initLogger.Error("Historical execution retrieval failed",
+ initLogger.Fatal("Historical execution retrieval failed",
"error", err,
"requestID", requestID,
"file", absoluteFilePath)
- os.Exit(1)
}
// Start the DAG with the same parameters with the execution that
// is being retried.
workflow, err := dag.Load(cfg.BaseConfig, absoluteFilePath, status.Status.Params)
if err != nil {
- initLogger.Error("Workflow specification load failed",
+ initLogger.Fatal("Workflow specification load failed",
"error", err,
"file", specFilePath,
"params", status.Status.Params)
- os.Exit(1)
}
newRequestID, err := generateRequestID()
if err != nil {
- initLogger.Error("Request ID generation failed", "error", err)
- os.Exit(1)
+ initLogger.Fatal("Request ID generation failed", "error", err)
}
- logFile, err := openLogFile("dry_", cfg.LogDir, workflow, newRequestID)
+ logFile, err := logger.OpenLogFile(logger.LogFileConfig{
+ Prefix: "retry_",
+ LogDir: cfg.LogDir,
+ DAGLogDir: workflow.LogDir,
+ DAGName: workflow.Name,
+ RequestID: newRequestID,
+ })
if err != nil {
- initLogger.Error("Log file creation failed",
+ initLogger.Fatal("Log file creation failed",
"error", err,
"workflow", workflow.Name)
- os.Exit(1)
}
defer logFile.Close()
@@ -126,8 +127,7 @@ func retryCmd() *cobra.Command {
listenSignals(ctx, agt)
if err := agt.Run(ctx); err != nil {
- agentLogger.Error("Failed to start workflow", "error", err)
- os.Exit(1)
+ agentLogger.Fatal("Failed to start workflow", "error", err)
}
},
}
diff --git a/cmd/scheduler.go b/cmd/scheduler.go
index e5ef45049..b25bbb102 100644
--- a/cmd/scheduler.go
+++ b/cmd/scheduler.go
@@ -17,7 +17,6 @@ package cmd
import (
"log"
- "os"
"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/logger"
@@ -54,14 +53,13 @@ func schedulerCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
sc := scheduler.New(cfg, logger, cli)
if err := sc.Start(ctx); err != nil {
- logger.Error(
+ logger.Fatal(
"Scheduler initialization failed",
"error",
err,
"specsDirectory",
cfg.DAGs,
)
- os.Exit(1)
}
},
}
diff --git a/cmd/server.go b/cmd/server.go
index 2889bee30..231f89ab9 100644
--- a/cmd/server.go
+++ b/cmd/server.go
@@ -17,7 +17,6 @@ package cmd
import (
"log"
- "os"
"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/frontend"
@@ -52,8 +51,7 @@ func serverCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
server := frontend.New(cfg, logger, cli)
if err := server.Serve(cmd.Context()); err != nil {
- logger.Error("Server initialization failed", "error", err)
- os.Exit(1)
+ logger.Fatal("Server initialization failed", "error", err)
}
},
}
diff --git a/cmd/start.go b/cmd/start.go
index f38002aba..069f384f2 100644
--- a/cmd/start.go
+++ b/cmd/start.go
@@ -17,7 +17,6 @@ package cmd
import (
"log"
- "os"
"path/filepath"
"github.com/daguflow/dagu/internal/agent"
@@ -52,32 +51,34 @@ func startCmd() *cobra.Command {
params, err := cmd.Flags().GetString("params")
if err != nil {
- initLogger.Error("Parameter retrieval failed", "error", err)
- os.Exit(1)
+ initLogger.Fatal("Parameter retrieval failed", "error", err)
}
workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params))
if err != nil {
- initLogger.Error("Workflow load failed", "error", err, "file", args[0])
- os.Exit(1)
+ initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}
requestID, err := generateRequestID()
if err != nil {
- initLogger.Error("Request ID generation failed", "error", err)
- os.Exit(1)
+ initLogger.Fatal("Request ID generation failed", "error", err)
}
- logFile, err := openLogFile("start_", cfg.LogDir, workflow, requestID)
+ logFile, err := logger.OpenLogFile(logger.LogFileConfig{
+ Prefix: "start_",
+ LogDir: cfg.LogDir,
+ DAGLogDir: workflow.LogDir,
+ DAGName: workflow.Name,
+ RequestID: requestID,
+ })
if err != nil {
- initLogger.Error(
+ initLogger.Fatal(
"Log file creation failed",
"error",
err,
"workflow",
workflow.Name,
)
- os.Exit(1)
}
defer logFile.Close()
@@ -111,11 +112,10 @@ func startCmd() *cobra.Command {
listenSignals(ctx, agt)
if err := agt.Run(ctx); err != nil {
- agentLogger.Error("Workflow execution failed",
+ agentLogger.Fatal("Workflow execution failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
- os.Exit(1)
}
},
}
diff --git a/cmd/start_all.go b/cmd/start_all.go
index a49c3f0f3..ad8119d0d 100644
--- a/cmd/start_all.go
+++ b/cmd/start_all.go
@@ -17,7 +17,6 @@ package cmd
import (
"log"
- "os"
"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/frontend"
@@ -60,8 +59,7 @@ func startAllCmd() *cobra.Command {
sc := scheduler.New(cfg, logger, cli)
if err := sc.Start(ctx); err != nil {
- logger.Error("Scheduler initialization failed", "error", err, "dags", cfg.DAGs)
- os.Exit(1)
+ logger.Fatal("Scheduler initialization failed", "error", err, "dags", cfg.DAGs)
}
}()
@@ -69,8 +67,7 @@ func startAllCmd() *cobra.Command {
server := frontend.New(cfg, logger, cli)
if err := server.Serve(ctx); err != nil {
- logger.Error("Server initialization failed", "error", err)
- os.Exit(1)
+ logger.Fatal("Server initialization failed", "error", err)
}
},
}
diff --git a/cmd/status.go b/cmd/status.go
index 9c823f008..6266670f5 100644
--- a/cmd/status.go
+++ b/cmd/status.go
@@ -17,7 +17,6 @@ package cmd
import (
"log"
- "os"
"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/dag"
@@ -44,8 +43,7 @@ func statusCmd() *cobra.Command {
// Load the DAG file and get the current running status.
workflow, err := dag.Load(cfg.BaseConfig, args[0], "")
if err != nil {
- logger.Error("Workflow load failed", "error", err, "file", args[0])
- os.Exit(1)
+ logger.Fatal("Workflow load failed", "error", err, "file", args[0])
}
dataStore := newDataStores(cfg)
@@ -54,8 +52,7 @@ func statusCmd() *cobra.Command {
curStatus, err := cli.GetCurrentStatus(workflow)
if err != nil {
- logger.Error("Current status retrieval failed", "error", err)
- os.Exit(1)
+ logger.Fatal("Current status retrieval failed", "error", err)
}
logger.Info("Current status", "pid", curStatus.PID, "status", curStatus.Status)
diff --git a/cmd/stop.go b/cmd/stop.go
index 4ea594fa9..a3ddea739 100644
--- a/cmd/stop.go
+++ b/cmd/stop.go
@@ -17,7 +17,6 @@ package cmd
import (
"log"
- "os"
"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/dag"
@@ -50,8 +49,7 @@ func stopCmd() *cobra.Command {
workflow, err := dag.Load(cfg.BaseConfig, args[0], "")
if err != nil {
- logger.Error("Workflow load failed", "error", err, "file", args[0])
- os.Exit(1)
+ logger.Fatal("Workflow load failed", "error", err, "file", args[0])
}
logger.Info("Workflow stop initiated", "workflow", workflow.Name)
@@ -60,14 +58,13 @@ func stopCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
if err := cli.Stop(workflow); err != nil {
- logger.Error(
+ logger.Fatal(
"Workflow stop operation failed",
"error",
err,
"workflow",
workflow.Name,
)
- os.Exit(1)
}
},
}
diff --git a/docs/source/yaml_format.rst b/docs/source/yaml_format.rst
index 7237eb83e..44590964b 100644
--- a/docs/source/yaml_format.rst
+++ b/docs/source/yaml_format.rst
@@ -317,7 +317,7 @@ This section provides a comprehensive list of available fields that can be used
- ``logDir``: The directory where the standard output is written. The default value is ``${HOME}/.local/share/logs``.
- ``restartWaitSec``: The number of seconds to wait after the DAG process stops before restarting it.
- ``histRetentionDays``: The number of days to retain execution history (not for log files).
-- ``timeout``: The timeout of the DAG, which is optional. Unit is seconds.
+- ``timeoutSec``: The timeout of the DAG, which is optional. Unit is seconds.
- ``delaySec``: The interval time in seconds between steps.
- ``maxActiveRuns``: The maximum number of parallel running steps.
- ``params``: The default parameters that can be referred to by ``$1``, ``$2``, and so on.
@@ -344,7 +344,7 @@ Example:
logDir: ${LOG_DIR}
restartWaitSec: 60
histRetentionDays: 3
- timeout: 3600
+ timeoutSec: 3600
delaySec: 1
maxActiveRuns: 1
params: param1 param2
diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go
index 37b5b587f..af864c588 100644
--- a/internal/agent/agent_test.go
+++ b/internal/agent/agent_test.go
@@ -248,7 +248,7 @@ func TestAgent_Retry(t *testing.T) {
// Modify the DAG to make it successful
for _, node := range status.Nodes {
- node.CmdWithArgs = "true"
+ node.Step.CmdWithArgs = "true"
}
// Retry the DAG and check if it is successful
diff --git a/internal/agent/reporter.go b/internal/agent/reporter.go
index d8d2425f6..59e20f18a 100644
--- a/internal/agent/reporter.go
+++ b/internal/agent/reporter.go
@@ -55,7 +55,7 @@ func (r *reporter) reportStep(
nodeStatus := node.State().Status
if nodeStatus != scheduler.NodeStatusNone {
r.logger.Info("Step execution finished",
- "step", node.Data().Name,
+ "step", node.Data().Step.Name,
"status", nodeStatus,
)
}
@@ -160,15 +160,15 @@ func renderTable(nodes []*model.Node) string {
},
)
for i, n := range nodes {
- var command = n.Command
- if n.Args != nil {
+ var command = n.Step.Command
+ if n.Step.Args != nil {
command = strings.Join(
- []string{n.Command, strings.Join(n.Args, " ")}, " ",
+ []string{n.Step.Command, strings.Join(n.Step.Args, " ")}, " ",
)
}
t.AppendRow(table.Row{
fmt.Sprintf("%d", i+1),
- n.Name,
+ n.Step.Name,
n.StartedAt,
n.FinishedAt,
n.StatusText,
@@ -215,7 +215,7 @@ func renderHTML(nodes []*model.Node) string {
}
for _, n := range nodes {
_, _ = buffer.WriteString("
")
- addValFunc(n.Name)
+ addValFunc(n.Step.Name)
addValFunc(n.StartedAt)
addValFunc(n.FinishedAt)
addStatusFunc(n.Status)
diff --git a/internal/agent/reporter_test.go b/internal/agent/reporter_test.go
index 79499306f..202cba8c8 100644
--- a/internal/agent/reporter_test.go
+++ b/internal/agent/reporter_test.go
@@ -181,8 +181,8 @@ func testRenderSummary(t *testing.T, _ *reporter, workflow *dag.DAG, nodes []*mo
func testRenderTable(t *testing.T, _ *reporter, _ *dag.DAG, nodes []*model.Node) {
summary := renderTable(nodes)
- require.Contains(t, summary, nodes[0].Name)
- require.Contains(t, summary, nodes[0].Args[0])
+ require.Contains(t, summary, nodes[0].Step.Name)
+ require.Contains(t, summary, nodes[0].Step.Args[0])
}
type mockSender struct {
diff --git a/internal/agent/testdata/timeout.yaml b/internal/agent/testdata/timeout.yaml
index 13ff24edf..d5e01f2c7 100644
--- a/internal/agent/testdata/timeout.yaml
+++ b/internal/agent/testdata/timeout.yaml
@@ -1,4 +1,4 @@
-timeout: 2
+timeoutSec: 2
steps:
- name: "1"
command: "sleep 1"
diff --git a/internal/client/client.go b/internal/client/client.go
index fa2176a01..531e5fc04 100644
--- a/internal/client/client.go
+++ b/internal/client/client.go
@@ -302,7 +302,12 @@ func (e *client) GetAllStatusPagination(params dags.ListDagsParams) ([]*DAGStatu
currentStatus *DAGStatus
)
- if dagListPaginationResult, err = dagStore.ListPagination(params); err != nil {
+ if dagListPaginationResult, err = dagStore.ListPagination(persistence.DAGListPaginationArgs{
+ Page: int(params.Page),
+ Limit: int(params.Limit),
+ Name: params.SearchName,
+ Tag: params.SearchTag,
+ }); err != nil {
return dagStatusList, &DagListPaginationSummaryResult{PageCount: 1}, err
}
@@ -314,7 +319,7 @@ func (e *client) GetAllStatusPagination(params dags.ListDagsParams) ([]*DAGStatu
}
return dagStatusList, &DagListPaginationSummaryResult{
- PageCount: e.getPageCount(dagListPaginationResult.Count, params.Limit),
+ PageCount: e.getPageCount(int64(dagListPaginationResult.Count), params.Limit),
ErrorList: dagListPaginationResult.ErrorList,
}, nil
}
diff --git a/internal/client/client_test.go b/internal/client/client_test.go
index 1ce70385e..fb59ca4a4 100644
--- a/internal/client/client_test.go
+++ b/internal/client/client_test.go
@@ -855,7 +855,7 @@ func testNewStatus(workflow *dag.DAG, requestID string, status scheduler.Status,
workflow,
[]scheduler.NodeData{
{
- NodeState: scheduler.NodeState{Status: nodeStatus},
+ State: scheduler.NodeState{Status: nodeStatus},
},
},
status,
diff --git a/internal/dag/builder.go b/internal/dag/builder.go
index 75d11cc49..4103d509b 100644
--- a/internal/dag/builder.go
+++ b/internal/dag/builder.go
@@ -126,7 +126,7 @@ func (b *builder) build(def *definition, envs []string) (*DAG, error) {
Name: def.Name,
Group: def.Group,
Description: def.Description,
- Timeout: time.Second * time.Duration(def.Timeout),
+ Timeout: time.Second * time.Duration(def.TimeoutSec),
Delay: time.Second * time.Duration(def.DelaySec),
RestartWait: time.Second * time.Duration(def.RestartWaitSec),
Tags: parseTags(def.Tags),
diff --git a/internal/dag/definition.go b/internal/dag/definition.go
index b43020fc0..9921a0be6 100644
--- a/internal/dag/definition.go
+++ b/internal/dag/definition.go
@@ -32,7 +32,7 @@ type definition struct {
MailOn *mailOnDef
ErrorMail mailConfigDef
InfoMail mailConfigDef
- Timeout int
+ TimeoutSec int
DelaySec int
RestartWaitSec int
HistRetentionDays *int
diff --git a/internal/dag/scheduler/graph.go b/internal/dag/scheduler/graph.go
index 8a775424f..2591aa1e0 100644
--- a/internal/dag/scheduler/graph.go
+++ b/internal/dag/scheduler/graph.go
@@ -194,7 +194,7 @@ func (g *ExecutionGraph) setupRetry() error {
dict := map[int]NodeStatus{}
retry := map[int]bool{}
for _, node := range g.nodes {
- dict[node.id] = node.data.Status
+ dict[node.id] = node.data.State.Status
retry[node.id] = false
}
var frontier []int
diff --git a/internal/dag/scheduler/graph_test.go b/internal/dag/scheduler/graph_test.go
index bf91ddca3..c84173fbd 100644
--- a/internal/dag/scheduler/graph_test.go
+++ b/internal/dag/scheduler/graph_test.go
@@ -44,7 +44,7 @@ func TestRetryExecution(t *testing.T) {
{
data: NodeData{
Step: dag.Step{Name: "1", Command: "true"},
- NodeState: NodeState{
+ State: NodeState{
Status: NodeStatusSuccess,
},
},
@@ -52,7 +52,7 @@ func TestRetryExecution(t *testing.T) {
{
data: NodeData{
Step: dag.Step{Name: "2", Command: "true", Depends: []string{"1"}},
- NodeState: NodeState{
+ State: NodeState{
Status: NodeStatusError,
},
},
@@ -60,7 +60,7 @@ func TestRetryExecution(t *testing.T) {
{
data: NodeData{
Step: dag.Step{Name: "3", Command: "true", Depends: []string{"2"}},
- NodeState: NodeState{
+ State: NodeState{
Status: NodeStatusCancel,
},
},
@@ -68,7 +68,7 @@ func TestRetryExecution(t *testing.T) {
{
data: NodeData{
Step: dag.Step{Name: "4", Command: "true", Depends: []string{}},
- NodeState: NodeState{
+ State: NodeState{
Status: NodeStatusSkipped,
},
},
@@ -76,7 +76,7 @@ func TestRetryExecution(t *testing.T) {
{
data: NodeData{
Step: dag.Step{Name: "5", Command: "true", Depends: []string{"4"}},
- NodeState: NodeState{
+ State: NodeState{
Status: NodeStatusError,
},
},
@@ -84,7 +84,7 @@ func TestRetryExecution(t *testing.T) {
{
data: NodeData{
Step: dag.Step{Name: "6", Command: "true", Depends: []string{"5"}},
- NodeState: NodeState{
+ State: NodeState{
Status: NodeStatusSuccess,
},
},
@@ -92,7 +92,7 @@ func TestRetryExecution(t *testing.T) {
{
data: NodeData{
Step: dag.Step{Name: "7", Command: "true", Depends: []string{"6"}},
- NodeState: NodeState{
+ State: NodeState{
Status: NodeStatusSkipped,
},
},
@@ -100,7 +100,7 @@ func TestRetryExecution(t *testing.T) {
{
data: NodeData{
Step: dag.Step{Name: "8", Command: "true", Depends: []string{}},
- NodeState: NodeState{
+ State: NodeState{
Status: NodeStatusSkipped,
},
},
diff --git a/internal/dag/scheduler/node.go b/internal/dag/scheduler/node.go
index ef2af54e6..7436d6b41 100644
--- a/internal/dag/scheduler/node.go
+++ b/internal/dag/scheduler/node.go
@@ -34,6 +34,45 @@ import (
"golang.org/x/sys/unix"
)
+// Node is a node in a DAG. It executes a command.
+type Node struct {
+ data NodeData
+
+ id int
+ mu sync.RWMutex
+ logLock sync.Mutex
+ cmd executor.Executor
+ cancelFunc func()
+ logFile *os.File
+ logWriter *bufio.Writer
+ stdoutFile *os.File
+ stdoutWriter *bufio.Writer
+ stderrFile *os.File
+ stderrWriter *bufio.Writer
+ outputWriter *os.File
+ outputReader *os.File
+ scriptFile *os.File
+ done bool
+}
+
+type NodeData struct {
+ Step dag.Step
+ State NodeState
+}
+
+// NodeState contains the state of a node.
+type NodeState struct {
+ Status NodeStatus
+ Log string
+ StartedAt time.Time
+ FinishedAt time.Time
+ RetryCount int
+ RetriedAt time.Time
+ DoneCount int
+ Error error
+}
+
+// NodeStatus represents the status of a node.
type NodeStatus int
const (
@@ -66,64 +105,26 @@ func (s NodeStatus) String() string {
func NewNode(step dag.Step, state NodeState) *Node {
return &Node{
- data: NodeData{Step: step, NodeState: state},
+ data: NodeData{Step: step, State: state},
}
}
-type NodeData struct {
- dag.Step
- NodeState
-}
-
-// Node is a node in a DAG. It executes a command.
-type Node struct {
- data NodeData
-
- id int
- mu sync.RWMutex
- logLock sync.Mutex
- cmd executor.Executor
- cancelFunc func()
- logFile *os.File
- logWriter *bufio.Writer
- stdoutFile *os.File
- stdoutWriter *bufio.Writer
- stderrFile *os.File
- stderrWriter *bufio.Writer
- outputWriter *os.File
- outputReader *os.File
- scriptFile *os.File
- done bool
-}
-
-// NodeState is the state of a node.
-type NodeState struct {
- Status NodeStatus
- Log string
- StartedAt time.Time
- FinishedAt time.Time
- RetryCount int
- RetriedAt time.Time
- DoneCount int
- Error error
-}
-
-func (n *Node) finish() {
- n.mu.Lock()
- defer n.mu.Unlock()
- n.data.FinishedAt = time.Now()
+func (n *Node) Data() NodeData {
+ n.mu.RLock()
+ defer n.mu.RUnlock()
+ return n.data
}
func (n *Node) SetError(err error) {
n.mu.Lock()
defer n.mu.Unlock()
- n.data.Error = err
+ n.data.State.Error = err
}
func (n *Node) State() NodeState {
n.mu.RLock()
defer n.mu.RUnlock()
- return n.data.NodeState
+ return n.data.State
}
// Execute runs the command synchronously and returns error if any.
@@ -146,7 +147,13 @@ func (n *Node) Execute(ctx context.Context) error {
)
}
- return n.data.Error
+ return n.data.State.Error
+}
+
+func (n *Node) finish() {
+ n.mu.Lock()
+ defer n.mu.Unlock()
+ n.data.State.FinishedAt = time.Now()
}
func (n *Node) setupExec(ctx context.Context) (executor.Executor, error) {
@@ -203,51 +210,45 @@ func (n *Node) setupExec(ctx context.Context) (executor.Executor, error) {
return cmd, nil
}
-func (n *Node) Data() NodeData {
- n.mu.RLock()
- defer n.mu.RUnlock()
- return n.data
-}
-
func (n *Node) getRetryCount() int {
n.mu.RLock()
defer n.mu.RUnlock()
- return n.data.RetryCount
+ return n.data.State.RetryCount
}
func (n *Node) setRetriedAt(retriedAt time.Time) {
n.mu.Lock()
defer n.mu.Unlock()
- n.data.RetriedAt = retriedAt
+ n.data.State.RetriedAt = retriedAt
}
func (n *Node) getDoneCount() int {
n.mu.RLock()
defer n.mu.RUnlock()
- return n.data.DoneCount
+ return n.data.State.DoneCount
}
func (n *Node) clearState() {
- n.data.NodeState = NodeState{}
+ n.data.State = NodeState{}
}
func (n *Node) setStatus(status NodeStatus) {
n.mu.Lock()
defer n.mu.Unlock()
- n.data.Status = status
+ n.data.State.Status = status
}
func (n *Node) setErr(err error) {
n.mu.Lock()
defer n.mu.Unlock()
- n.data.Error = err
- n.data.Status = NodeStatusError
+ n.data.State.Error = err
+ n.data.State.Status = NodeStatusError
}
func (n *Node) signal(sig os.Signal, allowOverride bool) {
n.mu.Lock()
defer n.mu.Unlock()
- status := n.data.Status
+ status := n.data.State.Status
if status == NodeStatusRunning && n.cmd != nil {
sigsig := sig
if allowOverride && n.data.Step.SignalOnStop != "" {
@@ -257,16 +258,16 @@ func (n *Node) signal(sig os.Signal, allowOverride bool) {
util.LogErr("sending signal", n.cmd.Kill(sigsig))
}
if status == NodeStatusRunning {
- n.data.Status = NodeStatusCancel
+ n.data.State.Status = NodeStatusCancel
}
}
func (n *Node) cancel() {
n.mu.Lock()
defer n.mu.Unlock()
- status := n.data.Status
+ status := n.data.State.Status
if status == NodeStatusRunning {
- n.data.Status = NodeStatusCancel
+ n.data.State.Status = NodeStatusCancel
}
if n.cancelFunc != nil {
log.Printf("canceling node: %s", n.data.Step.Name)
@@ -278,24 +279,22 @@ func (n *Node) setup(logDir string, requestID string) error {
n.mu.Lock()
defer n.mu.Unlock()
- n.data.StartedAt = time.Now()
- n.data.Log = filepath.Join(logDir, fmt.Sprintf("%s.%s.%s.log",
+ n.data.State.StartedAt = time.Now()
+ n.data.State.Log = filepath.Join(logDir, fmt.Sprintf("%s.%s.%s.log",
util.ValidFilename(n.data.Step.Name),
- n.data.StartedAt.Format("20060102.15:04:05.000"),
+ n.data.State.StartedAt.Format("20060102.15:04:05.000"),
util.TruncString(requestID, 8),
))
- for _, fn := range []func() error{
- n.setupLog,
- n.setupStdout,
- n.setupStderr,
- n.setupScript,
- } {
- if err := fn(); err != nil {
- n.data.Error = err
- return err
- }
+ if err := n.setupLog(); err != nil {
+ return err
}
- return nil
+ if err := n.setupStdout(); err != nil {
+ return err
+ }
+ if err := n.setupStderr(); err != nil {
+ return err
+ }
+ return n.setupScript()
}
var (
@@ -328,7 +327,7 @@ func (n *Node) setupStdout() error {
var err error
n.stdoutFile, err = util.OpenOrCreateFile(f)
if err != nil {
- n.data.Error = err
+ n.data.State.Error = err
return err
}
n.stdoutWriter = bufio.NewWriter(n.stdoutFile)
@@ -345,7 +344,7 @@ func (n *Node) setupStderr() error {
var err error
n.stderrFile, err = util.OpenOrCreateFile(f)
if err != nil {
- n.data.Error = err
+ n.data.State.Error = err
return err
}
n.stderrWriter = bufio.NewWriter(n.stderrFile)
@@ -354,15 +353,15 @@ func (n *Node) setupStderr() error {
}
func (n *Node) setupLog() error {
- if n.data.Log == "" {
+ if n.data.State.Log == "" {
return nil
}
n.logLock.Lock()
defer n.logLock.Unlock()
var err error
- n.logFile, err = util.OpenOrCreateFile(n.data.Log)
+ n.logFile, err = util.OpenOrCreateFile(n.data.State.Log)
if err != nil {
- n.data.Error = err
+ n.data.State.Error = err
return err
}
n.logWriter = bufio.NewWriter(n.logFile)
@@ -397,7 +396,7 @@ func (n *Node) teardown() error {
_ = os.Remove(n.scriptFile.Name())
}
if lastErr != nil {
- n.data.Error = lastErr
+ n.data.State.Error = lastErr
}
return lastErr
}
@@ -405,13 +404,13 @@ func (n *Node) teardown() error {
func (n *Node) incRetryCount() {
n.mu.Lock()
defer n.mu.Unlock()
- n.data.RetryCount++
+ n.data.State.RetryCount++
}
func (n *Node) incDoneCount() {
n.mu.Lock()
defer n.mu.Unlock()
- n.data.DoneCount++
+ n.data.State.DoneCount++
}
var (
diff --git a/internal/dag/scheduler/node_test.go b/internal/dag/scheduler/node_test.go
index 5fd6acd14..981badc2d 100644
--- a/internal/dag/scheduler/node_test.go
+++ b/internal/dag/scheduler/node_test.go
@@ -36,7 +36,7 @@ func TestExecute(t *testing.T) {
OutputVariables: &dag.SyncMap{},
}}}
require.NoError(t, n.Execute(context.Background()))
- require.Nil(t, n.data.Error)
+ require.Nil(t, n.data.State.Error)
}
func TestError(t *testing.T) {
@@ -47,7 +47,7 @@ func TestError(t *testing.T) {
}}}
err := n.Execute(context.Background())
require.True(t, err != nil)
- require.Equal(t, n.data.Error, err)
+ require.Equal(t, n.data.State.Error, err)
}
func TestSignal(t *testing.T) {
@@ -369,13 +369,13 @@ func TestTeardown(t *testing.T) {
// no error since done flag is true
err := n.teardown()
require.NoError(t, err)
- require.NoError(t, n.data.Error)
+ require.NoError(t, n.data.State.Error)
// error
n.done = false
err = n.teardown()
require.Error(t, err)
- require.Error(t, n.data.Error)
+ require.Error(t, n.data.State.Error)
}
func runTestNode(t *testing.T, n *Node) {
diff --git a/internal/dag/scheduler/scheduler.go b/internal/dag/scheduler/scheduler.go
index d12b24c12..6fd144d2d 100644
--- a/internal/dag/scheduler/scheduler.go
+++ b/internal/dag/scheduler/scheduler.go
@@ -404,7 +404,7 @@ func isReady(g *ExecutionGraph, node *Node) bool {
func (sc *Scheduler) runHandlerNode(ctx context.Context, node *Node) error {
defer func() {
- node.data.FinishedAt = time.Now()
+ node.data.State.FinishedAt = time.Now()
}()
node.setStatus(NodeStatusRunning)
diff --git a/internal/dag/scheduler/scheduler_test.go b/internal/dag/scheduler/scheduler_test.go
index 1ffacf9e5..b50d95596 100644
--- a/internal/dag/scheduler/scheduler_test.go
+++ b/internal/dag/scheduler/scheduler_test.go
@@ -544,7 +544,7 @@ func TestRepeat(t *testing.T) {
require.Equal(t, sc.Status(g), StatusCancel)
require.Equal(t, NodeStatusCancel, nodes[0].State().Status)
- require.Equal(t, nodes[0].data.DoneCount, 2)
+ require.Equal(t, nodes[0].data.State.DoneCount, 2)
}
func TestRepeatFail(t *testing.T) {
@@ -566,7 +566,7 @@ func TestRepeatFail(t *testing.T) {
nodes := g.Nodes()
require.Equal(t, sc.Status(g), StatusError)
require.Equal(t, NodeStatusError, nodes[0].State().Status)
- require.Equal(t, nodes[0].data.DoneCount, 1)
+ require.Equal(t, nodes[0].data.State.DoneCount, 1)
}
func TestStopRepetitiveTaskGracefully(t *testing.T) {
@@ -600,7 +600,7 @@ func TestStopRepetitiveTaskGracefully(t *testing.T) {
require.Equal(t, sc.Status(g), StatusSuccess)
require.Equal(t, NodeStatusSuccess, nodes[0].State().Status)
- require.Equal(t, nodes[0].data.DoneCount, 1)
+ require.Equal(t, nodes[0].data.State.DoneCount, 1)
}
func TestSchedulerStatusText(t *testing.T) {
@@ -643,7 +643,7 @@ func TestNodeSetupFailure(t *testing.T) {
nodes := g.Nodes()
require.Equal(t, NodeStatusError, nodes[0].State().Status)
- require.Equal(t, nodes[0].data.DoneCount, 0)
+ require.Equal(t, nodes[0].data.State.DoneCount, 0)
}
func TestNodeTeardownFailure(t *testing.T) {
diff --git a/internal/frontend/dag/handler.go b/internal/frontend/dag/handler.go
index 260b46446..f1c9fcf20 100644
--- a/internal/frontend/dag/handler.go
+++ b/internal/frontend/dag/handler.go
@@ -426,16 +426,16 @@ func (h *Handler) processStepLogRequest(
}
if node == nil {
- if status.OnSuccess != nil && status.OnSuccess.Name == *params.Step {
+ if status.OnSuccess != nil && status.OnSuccess.Step.Name == *params.Step {
node = status.OnSuccess
}
- if status.OnFailure != nil && status.OnFailure.Name == *params.Step {
+ if status.OnFailure != nil && status.OnFailure.Step.Name == *params.Step {
node = status.OnFailure
}
- if status.OnCancel != nil && status.OnCancel.Name == *params.Step {
+ if status.OnCancel != nil && status.OnCancel.Step.Name == *params.Step {
node = status.OnCancel
}
- if status.OnExit != nil && status.OnExit.Name == *params.Step {
+ if status.OnExit != nil && status.OnExit.Step.Name == *params.Step {
node = status.OnExit
}
}
@@ -489,7 +489,7 @@ func (h *Handler) processLogRequest(
nodeNameToStatusList := map[string][]scheduler.NodeStatus{}
for idx, log := range logs {
for _, node := range log.Status.Nodes {
- addNodeStatus(nodeNameToStatusList, len(logs), idx, node.Name, node.Status)
+ addNodeStatus(nodeNameToStatusList, len(logs), idx, node.Step.Name, node.Status)
}
}
@@ -515,23 +515,23 @@ func (h *Handler) processLogRequest(
for idx, log := range logs {
if n := log.Status.OnSuccess; n != nil {
addNodeStatus(
- handlerToStatusList, len(logs), idx, n.Name, n.Status,
+ handlerToStatusList, len(logs), idx, n.Step.Name, n.Status,
)
}
if n := log.Status.OnFailure; n != nil {
addNodeStatus(
- handlerToStatusList, len(logs), idx, n.Name, n.Status,
+ handlerToStatusList, len(logs), idx, n.Step.Name, n.Status,
)
}
if n := log.Status.OnCancel; n != nil {
n := log.Status.OnCancel
addNodeStatus(
- handlerToStatusList, len(logs), idx, n.Name, n.Status,
+ handlerToStatusList, len(logs), idx, n.Step.Name, n.Status,
)
}
if n := log.Status.OnExit; n != nil {
addNodeStatus(
- handlerToStatusList, len(logs), idx, n.Name, n.Status,
+ handlerToStatusList, len(logs), idx, n.Step.Name, n.Status,
)
}
}
diff --git a/internal/logger/file.go b/internal/logger/file.go
new file mode 100644
index 000000000..a718ec2a9
--- /dev/null
+++ b/internal/logger/file.go
@@ -0,0 +1,75 @@
+// Copyright (C) 2024 The Daguflow/Dagu Authors
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logger
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/daguflow/dagu/internal/util"
+)
+
+// LogFileConfig holds the configuration for opening a log file
+type LogFileConfig struct {
+ Prefix string
+ LogDir string
+ DAGLogDir string
+ DAGName string
+ RequestID string
+}
+
+// OpenLogFile opens a log file for the workflow.
+func OpenLogFile(config LogFileConfig) (*os.File, error) {
+ logDir, err := prepareLogDirectory(config)
+ if err != nil {
+ return nil, fmt.Errorf("failed to prepare log directory: %w", err)
+ }
+
+ filename := generateLogFilename(config)
+ return openFile(filepath.Join(logDir, filename))
+}
+
+func prepareLogDirectory(config LogFileConfig) (string, error) {
+ validName := util.ValidFilename(config.DAGName)
+ logDir := filepath.Join(config.LogDir, validName)
+ if config.DAGLogDir != "" {
+ logDir = filepath.Join(config.DAGLogDir, validName)
+ }
+ if err := os.MkdirAll(logDir, 0755); err != nil {
+ return "", fmt.Errorf("failed to create log directory: %w", err)
+ }
+
+ return logDir, nil
+}
+
+func generateLogFilename(config LogFileConfig) string {
+ return fmt.Sprintf("%s%s.%s.%s.log",
+ config.Prefix,
+ util.ValidFilename(config.DAGName),
+ time.Now().Format("20060102.15:04:05.000"),
+ util.TruncString(config.RequestID, 8),
+ )
+}
+
+func openFile(filepath string) (*os.File, error) {
+ return os.OpenFile(
+ filepath,
+ os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC,
+ 0644,
+ )
+}
diff --git a/internal/logger/file_test.go b/internal/logger/file_test.go
new file mode 100644
index 000000000..e44f008cd
--- /dev/null
+++ b/internal/logger/file_test.go
@@ -0,0 +1,123 @@
+// Copyright (C) 2024 The Daguflow/Dagu Authors
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logger
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestOpenLogFile(t *testing.T) {
+ tempDir, err := os.MkdirTemp("", "test_log_dir")
+ require.NoError(t, err)
+ defer os.RemoveAll(tempDir)
+
+ config := LogFileConfig{
+ Prefix: "test_",
+ LogDir: tempDir,
+ DAGName: "test_dag",
+ RequestID: "12345678",
+ }
+
+ file, err := OpenLogFile(config)
+ require.NoError(t, err)
+ defer file.Close()
+
+ assert.NotNil(t, file)
+ assert.True(t, filepath.IsAbs(file.Name()))
+ assert.Contains(t, file.Name(), "test_dag")
+ assert.Contains(t, file.Name(), "test_")
+ assert.Contains(t, file.Name(), "12345678")
+}
+
+func TestPrepareLogDirectory(t *testing.T) {
+ tempDir, err := os.MkdirTemp("", "test_log_dir")
+ require.NoError(t, err)
+ defer os.RemoveAll(tempDir)
+
+ tests := []struct {
+ name string
+ config LogFileConfig
+ expected string
+ }{
+ {
+ name: "Default LogDir",
+ config: LogFileConfig{
+ LogDir: tempDir,
+ DAGName: "test_dag",
+ },
+ expected: filepath.Join(tempDir, "test_dag"),
+ },
+ {
+ name: "Custom DAGLogDir",
+ config: LogFileConfig{
+ LogDir: tempDir,
+ DAGLogDir: filepath.Join(tempDir, "custom"),
+ DAGName: "test_dag",
+ },
+ expected: filepath.Join(tempDir, "custom", "test_dag"),
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result, err := prepareLogDirectory(tt.config)
+ require.NoError(t, err)
+ assert.Equal(t, tt.expected, result)
+ assert.DirExists(t, result)
+ })
+ }
+}
+
+func TestGenerateLogFilename(t *testing.T) {
+ config := LogFileConfig{
+ Prefix: "test_",
+ DAGName: "test dag",
+ RequestID: "12345678",
+ }
+
+ filename := generateLogFilename(config)
+
+ assert.Contains(t, filename, "test_")
+ assert.Contains(t, filename, "test_dag")
+ assert.Contains(t, filename, time.Now().Format("20060102"))
+ assert.Contains(t, filename, "12345678")
+ assert.Contains(t, filename, ".log")
+}
+
+func TestOpenFile(t *testing.T) {
+ tempDir, err := os.MkdirTemp("", "test_log_dir")
+ require.NoError(t, err)
+ defer os.RemoveAll(tempDir)
+
+ filePath := filepath.Join(tempDir, "test.log")
+
+ file, err := openFile(filePath)
+ require.NoError(t, err)
+ defer file.Close()
+
+ assert.NotNil(t, file)
+ assert.Equal(t, filePath, file.Name())
+
+ info, err := file.Stat()
+ require.NoError(t, err)
+ assert.Equal(t, os.FileMode(0644), info.Mode().Perm())
+}
diff --git a/internal/logger/logger.go b/internal/logger/logger.go
index 73fa37022..7c7ffcf20 100644
--- a/internal/logger/logger.go
+++ b/internal/logger/logger.go
@@ -31,11 +31,13 @@ type (
Info(msg string, tags ...any)
Warn(msg string, tags ...any)
Error(msg string, tags ...any)
+ Fatal(msg string, tags ...any)
Debugf(format string, v ...any)
Infof(format string, v ...any)
Warnf(format string, v ...any)
Errorf(format string, v ...any)
+ Fatalf(format string, v ...any)
With(attrs ...any) Logger
WithGroup(name string) Logger
@@ -52,7 +54,6 @@ var _ Logger = (*appLogger)(nil)
type appLogger struct {
logger *slog.Logger
guardedHandler *guardedHandler
- prefix string
quiet bool
}
@@ -174,42 +175,54 @@ func newHandler(f *os.File, format string, opts *slog.HandlerOptions) slog.Handl
// Debugf implements logger.Logger.
func (a *appLogger) Debugf(format string, v ...any) {
- a.logger.Debug(fmt.Sprintf(a.prefix+format, v...))
+ a.logger.Debug(fmt.Sprintf(format, v...))
}
// Errorf implements logger.Logger.
func (a *appLogger) Errorf(format string, v ...any) {
- a.logger.Error(fmt.Sprintf(a.prefix+format, v...))
+ a.logger.Error(fmt.Sprintf(format, v...))
+}
+
+// Fatalf implements logger.Logger.
+func (a *appLogger) Fatalf(format string, v ...any) {
+ a.logger.Error(fmt.Sprintf(format, v...))
+ os.Exit(1)
}
// Infof implements logger.Logger.
func (a *appLogger) Infof(format string, v ...any) {
- a.logger.Info(fmt.Sprintf(a.prefix+format, v...))
+ a.logger.Info(fmt.Sprintf(format, v...))
}
// Warnf implements logger.Logger.
func (a *appLogger) Warnf(format string, v ...any) {
- a.logger.Warn(fmt.Sprintf(a.prefix+format, v...))
+ a.logger.Warn(fmt.Sprintf(format, v...))
}
// Debug implements logger.Logger.
func (a *appLogger) Debug(msg string, tags ...any) {
- a.logger.Debug(a.prefix+msg, tags...)
+ a.logger.Debug(msg, tags...)
}
// Error implements logger.Logger.
func (a *appLogger) Error(msg string, tags ...any) {
- a.logger.Error(a.prefix+msg, tags...)
+ a.logger.Error(msg, tags...)
+}
+
+// Fatal implements logger.Logger.
+func (a *appLogger) Fatal(msg string, tags ...any) {
+ a.logger.Error(msg, tags...)
+ os.Exit(1)
}
// Info implements logger.Logger.
func (a *appLogger) Info(msg string, tags ...any) {
- a.logger.Info(a.prefix+msg, tags...)
+ a.logger.Info(msg, tags...)
}
// Warn implements logger.Logger.
func (a *appLogger) Warn(msg string, tags ...any) {
- a.logger.Warn(a.prefix+msg, tags...)
+ a.logger.Warn(msg, tags...)
}
// With implements logger.Logger.
diff --git a/internal/persistence/interface.go b/internal/persistence/interface.go
index b52b49b2f..2ba5debbb 100644
--- a/internal/persistence/interface.go
+++ b/internal/persistence/interface.go
@@ -20,7 +20,6 @@ import (
"time"
"github.com/daguflow/dagu/internal/dag"
- "github.com/daguflow/dagu/internal/frontend/gen/restapi/operations/dags"
"github.com/daguflow/dagu/internal/persistence/grep"
"github.com/daguflow/dagu/internal/persistence/model"
)
@@ -54,11 +53,10 @@ type DAGStore interface {
Create(name string, spec []byte) (string, error)
Delete(name string) error
List() (ret []*dag.DAG, errs []string, err error)
- ListPagination(params dags.ListDagsParams) (*DagListPaginationResult, error)
+ ListPagination(params DAGListPaginationArgs) (*DagListPaginationResult, error)
GetMetadata(name string) (*dag.DAG, error)
GetDetails(name string) (*dag.DAG, error)
Grep(pattern string) (ret []*GrepResult, errs []string, err error)
- Load(name string) (*dag.DAG, error)
Rename(oldID, newID string) error
GetSpec(name string) (string, error)
UpdateSpec(name string, spec []byte) error
@@ -66,18 +64,25 @@ type DAGStore interface {
TagList() ([]string, []string, error)
}
-type GrepResult struct {
- Name string
- DAG *dag.DAG
- Matches []*grep.Match
+type DAGListPaginationArgs struct {
+ Page int
+ Limit int
+ Name *string
+ Tag *string
}
type DagListPaginationResult struct {
DagList []*dag.DAG
- Count int64
+ Count int
ErrorList []string
}
+type GrepResult struct {
+ Name string
+ DAG *dag.DAG
+ Matches []*grep.Match
+}
+
type FlagStore interface {
ToggleSuspend(id string, suspend bool) error
IsSuspended(id string) bool
diff --git a/internal/persistence/local/dag_store.go b/internal/persistence/local/dag_store.go
index e74a72a27..4aa1e5aa7 100644
--- a/internal/persistence/local/dag_store.go
+++ b/internal/persistence/local/dag_store.go
@@ -26,7 +26,6 @@ import (
"time"
"github.com/daguflow/dagu/internal/dag"
- "github.com/daguflow/dagu/internal/frontend/gen/restapi/operations/dags"
"github.com/daguflow/dagu/internal/persistence"
"github.com/daguflow/dagu/internal/persistence/filecache"
"github.com/daguflow/dagu/internal/persistence/grep"
@@ -163,10 +162,6 @@ func (d *dagStoreImpl) ensureDirExist() error {
return nil
}
-func (d *dagStoreImpl) getFileNameDagMeta() {
-
-}
-
func (d *dagStoreImpl) searchName(fileName string, searchText *string) bool {
if searchText == nil {
return true
@@ -199,15 +194,15 @@ func (d *dagStoreImpl) getTagList(tagSet map[string]struct{}) []string {
return tagList
}
-func (d *dagStoreImpl) ListPagination(params dags.ListDagsParams) (*persistence.DagListPaginationResult, error) {
+func (d *dagStoreImpl) ListPagination(params persistence.DAGListPaginationArgs) (*persistence.DagListPaginationResult, error) {
var (
dagList = make([]*dag.DAG, 0)
errList = make([]string, 0)
- count int64
+ count int
currentDag *dag.DAG
)
- if err := filepath.WalkDir(d.dir, func(path string, dir fs.DirEntry, err error) error {
+ if err := filepath.WalkDir(d.dir, func(_ string, dir fs.DirEntry, err error) error {
if err != nil {
return err
}
@@ -220,12 +215,12 @@ func (d *dagStoreImpl) ListPagination(params dags.ListDagsParams) (*persistence.
errList = append(errList, fmt.Sprintf("reading %s failed: %s", dir.Name(), err))
}
- if !d.searchName(dir.Name(), params.SearchName) || currentDag == nil || !d.searchTags(currentDag.Tags, params.SearchTag) {
+ if !d.searchName(dir.Name(), params.Name) || currentDag == nil || !d.searchTags(currentDag.Tags, params.Tag) {
return nil
}
count++
- if count > (params.Page-1)*params.Limit && int64(len(dagList)) < params.Limit {
+ if count > (params.Page-1)*params.Limit && len(dagList) < params.Limit {
dagList = append(dagList, currentDag)
}
@@ -332,11 +327,6 @@ func (d *dagStoreImpl) Grep(
return ret, errs, nil
}
-func (d *dagStoreImpl) Load(name string) (*dag.DAG, error) {
- // TODO implement me
- panic("implement me")
-}
-
func (d *dagStoreImpl) Rename(oldID, newID string) error {
oldLoc, err := d.fileLocation(oldID)
if err != nil {
@@ -413,7 +403,7 @@ func (d *dagStoreImpl) TagList() ([]string, []string, error) {
err error
)
- if err = filepath.WalkDir(d.dir, func(path string, dir fs.DirEntry, err error) error {
+ if err = filepath.WalkDir(d.dir, func(_ string, dir fs.DirEntry, err error) error {
if err != nil {
return err
}
diff --git a/internal/persistence/model/node.go b/internal/persistence/model/node.go
index a6be152f0..f884bcf07 100644
--- a/internal/persistence/model/node.go
+++ b/internal/persistence/model/node.go
@@ -43,19 +43,19 @@ func FromNodes(nodes []scheduler.NodeData) []*Node {
func FromNode(node scheduler.NodeData) *Node {
return &Node{
Step: node.Step,
- Log: node.Log,
- StartedAt: util.FormatTime(node.StartedAt),
- FinishedAt: util.FormatTime(node.FinishedAt),
- Status: node.Status,
- StatusText: node.Status.String(),
- RetryCount: node.RetryCount,
- DoneCount: node.DoneCount,
- Error: errText(node.Error),
+ Log: node.State.Log,
+ StartedAt: util.FormatTime(node.State.StartedAt),
+ FinishedAt: util.FormatTime(node.State.FinishedAt),
+ Status: node.State.Status,
+ StatusText: node.State.Status.String(),
+ RetryCount: node.State.RetryCount,
+ DoneCount: node.State.DoneCount,
+ Error: errText(node.State.Error),
}
}
type Node struct {
- dag.Step `json:"Step"`
+ Step dag.Step `json:"Step"`
Log string `json:"Log"`
StartedAt string `json:"StartedAt"`
FinishedAt string `json:"FinishedAt"`
diff --git a/internal/persistence/model/status_test.go b/internal/persistence/model/status_test.go
index 09baae2d6..e05efc40f 100644
--- a/internal/persistence/model/status_test.go
+++ b/internal/persistence/model/status_test.go
@@ -65,7 +65,7 @@ func TestStatusSerialization(t *testing.T) {
require.Equal(t, status.Name, unmarshalled.Name)
require.Equal(t, 1, len(unmarshalled.Nodes))
- require.Equal(t, workflow.Steps[0].Name, unmarshalled.Nodes[0].Name)
+ require.Equal(t, workflow.Steps[0].Name, unmarshalled.Nodes[0].Step.Name)
}
func TestCorrectRunningStatus(t *testing.T) {