diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 95de4eeec..d2a4f21f1 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -339,6 +339,7 @@ func (a *Agent) newScheduler() *scheduler.Scheduler { LogDir: a.logDir, Logger: a.logger, MaxActiveRuns: a.dag.MaxActiveRuns, + Timeout: a.dag.Timeout, Delay: a.dag.Delay, Dry: a.dry, ReqID: a.requestID, diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index cdb39a9c6..37b5b587f 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -139,6 +139,20 @@ func TestAgent_Run(t *testing.T) { // Check if the status is saved correctly require.Equal(t, scheduler.StatusError, agt.Status().Status) }) + t.Run("FinishWithTimeout", func(t *testing.T) { + setup := test.SetupTest(t) + defer setup.Cleanup() + + // Run a DAG that timeout + timeoutDAG := testLoadDAG(t, "timeout.yaml") + agt := newAgent(setup, genRequestID(), timeoutDAG, &agent.Options{}) + ctx := context.Background() + err := agt.Run(ctx) + require.Error(t, err) + + // Check if the status is saved correctly + require.Equal(t, scheduler.StatusError, agt.Status().Status) + }) t.Run("ReceiveSignal", func(t *testing.T) { setup := test.SetupTest(t) defer setup.Cleanup() diff --git a/internal/agent/testdata/timeout.yaml b/internal/agent/testdata/timeout.yaml new file mode 100644 index 000000000..13ff24edf --- /dev/null +++ b/internal/agent/testdata/timeout.yaml @@ -0,0 +1,6 @@ +timeout: 2 +steps: + - name: "1" + command: "sleep 1" + - name: "2" + command: "sleep 2" diff --git a/internal/dag/builder.go b/internal/dag/builder.go index f08403120..75d11cc49 100644 --- a/internal/dag/builder.go +++ b/internal/dag/builder.go @@ -126,6 +126,7 @@ func (b *builder) build(def *definition, envs []string) (*DAG, error) { Name: def.Name, Group: def.Group, Description: def.Description, + Timeout: time.Second * time.Duration(def.Timeout), Delay: time.Second * time.Duration(def.DelaySec), RestartWait: time.Second * time.Duration(def.RestartWaitSec), Tags: parseTags(def.Tags), diff --git a/internal/dag/dag.go b/internal/dag/dag.go index 9b497b0b7..997bbbb6d 100644 --- a/internal/dag/dag.go +++ b/internal/dag/dag.go @@ -86,6 +86,8 @@ type DAG struct { // MailOn contains the conditions to send mail. MailOn *MailOn `json:"MailOn"` + // Timeout is a field to specify the maximum execution time of the DAG task + Timeout time.Duration `json:"Timeout"` // Misc configuration for DAG execution. // Delay is the delay before starting the DAG. Delay time.Duration `json:"Delay"` diff --git a/internal/dag/definition.go b/internal/dag/definition.go index 04c0a4a56..b43020fc0 100644 --- a/internal/dag/definition.go +++ b/internal/dag/definition.go @@ -32,6 +32,7 @@ type definition struct { MailOn *mailOnDef ErrorMail mailConfigDef InfoMail mailConfigDef + Timeout int DelaySec int RestartWaitSec int HistRetentionDays *int diff --git a/internal/dag/scheduler/scheduler.go b/internal/dag/scheduler/scheduler.go index 9d0588bc7..d12b24c12 100644 --- a/internal/dag/scheduler/scheduler.go +++ b/internal/dag/scheduler/scheduler.go @@ -58,6 +58,7 @@ type Scheduler struct { logDir string logger logger.Logger maxActiveRuns int + timeout time.Duration delay time.Duration dry bool onExit *dag.Step @@ -82,6 +83,7 @@ func New(cfg *Config) *Scheduler { logDir: cfg.LogDir, logger: lg, maxActiveRuns: cfg.MaxActiveRuns, + timeout: cfg.Timeout, delay: cfg.Delay, dry: cfg.Dry, onExit: cfg.OnExit, @@ -96,6 +98,7 @@ type Config struct { LogDir string Logger logger.Logger MaxActiveRuns int + Timeout time.Duration Delay time.Duration Dry bool OnExit *dag.Step @@ -115,6 +118,12 @@ func (sc *Scheduler) Schedule(ctx context.Context, g *ExecutionGraph, done chan var wg = sync.WaitGroup{} + var cancel context.CancelFunc + if sc.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, sc.timeout) + defer cancel() + } + for !sc.isFinished(g) { if sc.isCanceled() { break @@ -168,6 +177,14 @@ func (sc *Scheduler) Schedule(ctx context.Context, g *ExecutionGraph, done chan switch { case status == NodeStatusSuccess || status == NodeStatusCancel: // do nothing + case sc.isTimeout(g.startedAt): + sc.logger.Info( + "Step execution deadline exceeded", + "step", node.data.Step.Name, + "error", execErr, + ) + node.setStatus(NodeStatusCancel) + sc.setLastError(execErr) case sc.isCanceled(): sc.setLastError(execErr) case node.data.Step.RetryPolicy != nil && node.data.Step.RetryPolicy.Limit > node.getRetryCount(): @@ -480,6 +497,10 @@ func (sc *Scheduler) isSucceed(g *ExecutionGraph) bool { return true } +func (sc *Scheduler) isTimeout(startedAt time.Time) bool { + return sc.timeout > 0 && time.Since(startedAt) > sc.timeout +} + var ( errUpstreamFailed = fmt.Errorf("upstream failed") errUpstreamSkipped = fmt.Errorf("upstream skipped") diff --git a/internal/dag/scheduler/scheduler_test.go b/internal/dag/scheduler/scheduler_test.go index 379d8947a..1ffacf9e5 100644 --- a/internal/dag/scheduler/scheduler_test.go +++ b/internal/dag/scheduler/scheduler_test.go @@ -189,6 +189,31 @@ func TestSchedulerCancel(t *testing.T) { require.Equal(t, NodeStatusNone, nodes[2].State().Status) } +func TestSchedulerTimeout(t *testing.T) { + g, _ := NewExecutionGraph( + logger.Default, + step("1", "sleep 1"), + step("2", "sleep 1"), + step("3", "sleep 3"), + step("4", "sleep 10"), + step("5", "sleep 1", "2"), + step("6", "sleep 1", "5"), + ) + sc := New(&Config{Timeout: time.Second * 2, LogDir: testHomeDir}) + + err := sc.Schedule(context.Background(), g, nil) + require.Error(t, err) + require.Equal(t, sc.Status(g), StatusError) + + nodes := g.Nodes() + require.Equal(t, NodeStatusSuccess, nodes[0].State().Status) + require.Equal(t, NodeStatusSuccess, nodes[1].State().Status) + require.Equal(t, NodeStatusCancel, nodes[2].State().Status) + require.Equal(t, NodeStatusCancel, nodes[3].State().Status) + require.Equal(t, NodeStatusCancel, nodes[4].State().Status) + require.Equal(t, NodeStatusCancel, nodes[5].State().Status) +} + func TestSchedulerRetryFail(t *testing.T) { cmd := filepath.Join(util.MustGetwd(), "testdata/testfile.sh") g, sc, err := testSchedule(t,