Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pipeline pausing race conditions of draining and terminating source #2131

Merged
merged 9 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.27.0
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc
golang.org/x/net v0.29.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.8.0
Expand Down Expand Up @@ -197,7 +198,6 @@ require (
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
go.mongodb.org/mongo-driver v1.15.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.24.0 // indirect
Expand Down
79 changes: 52 additions & 27 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/imdario/mergo"
"go.uber.org/zap"
"golang.org/x/exp/maps"
appv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -181,8 +182,8 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
}

// check if any changes related to pause/resume lifecycle for the pipeline
if isLifecycleChange(pl) {
oldPhase := pl.Status.Phase
oldPhase := pl.Status.Phase
if isLifecycleChange(pl) && oldPhase != pl.Spec.Lifecycle.GetDesiredPhase() {
requeue, err := r.updateDesiredState(ctx, pl)
if err != nil {
logMsg := fmt.Sprintf("Updated desired pipeline phase failed: %v", zap.Error(err))
Expand Down Expand Up @@ -611,7 +612,7 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
copyVertexTemplate(pl, vCopy)
copyVertexLimits(pl, vCopy)
replicas := int32(1)
// If the desired phase is paused or we are in the middle of pausing we should not start any vertex replicas
// If the desired phase is paused, or we are in the middle of pausing we should not start any vertex replicas
if isLifecycleChange(pl) {
replicas = int32(0)
} else if v.IsReduceUDF() {
Expand Down Expand Up @@ -830,39 +831,48 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli
}

func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
// check that annotations / pause timestamp annotation exist
var (
drainCompleted = false
daemonClient daemonclient.DaemonClient
errWhileDrain error
)
pl.Status.MarkPhasePausing()

if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" {
_, err := r.scaleDownSourceVertices(ctx, pl)
if err != nil {
// If there's an error requeue the request
return true, err
}
patchJson := `{"metadata":{"annotations":{"` + dfv1.KeyPauseTimestamp + `":"` + time.Now().Format(time.RFC3339) + `"}}}`
if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
if err = r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) {
return true, err
}
// This is to give some time to process the new messages,
// otherwise check IsDrained directly may get incorrect information
return true, nil
}

pl.Status.MarkPhasePausing()
updated, err := r.scaleDownSourceVertices(ctx, pl)
if err != nil || updated {
// If there's an error, or scaling down happens, requeue the request
// This is to give some time to process the new messages, otherwise check IsDrained directly may get incorrect information
return updated, err
}

var daemonError error
var drainCompleted = false

// Check if all the source vertex pods have scaled down to zero
sourcePodsTerminated, err := r.noSourceVertexPodsRunning(ctx, pl)
// If the sources have scaled down successfully then check for the buffer information.
// Check for the daemon to obtain the buffer draining information, in case we see an error trying to
// retrieve this we do not exit prematurely to allow honoring the pause timeout for a consistent error
// - In case the timeout has not occurred we would trigger a requeue
// - If the timeout has occurred even after getting the drained error, we will try to pause the pipeline
daemonClient, daemonError := daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL())
if daemonClient != nil {
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err = daemonClient.IsDrained(ctx, pl.Name)
if err != nil {
daemonError = err
if sourcePodsTerminated {
daemonClient, err = daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL())
if daemonClient != nil {
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err = daemonClient.IsDrained(ctx, pl.Name)
}
}
if err != nil {
errWhileDrain = err
}

pauseTimestamp, err := time.Parse(time.RFC3339, pl.GetAnnotations()[dfv1.KeyPauseTimestamp])
if err != nil {
return false, err
Expand All @@ -874,8 +884,8 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
if err != nil {
return true, err
}
if daemonError != nil {
r.logger.Errorw("Error in fetching Drained status, Pausing due to timeout", zap.Error(daemonError))
if errWhileDrain != nil {
r.logger.Errorw("Errors encountered while pausing, moving to paused after timeout", zap.Error(errWhileDrain))
}
// if the drain completed successfully, then set the DrainedOnPause field to true
if drainCompleted {
Expand All @@ -884,7 +894,20 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
pl.Status.MarkPhasePaused()
return false, nil
}
return true, daemonError
return true, err
}

// noSourceVertexPodsRunning checks whether any source vertex has running replicas
func (r *pipelineReconciler) noSourceVertexPodsRunning(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
sources := pl.Spec.GetSourcesByName()
pods := corev1.PodList{}
label := fmt.Sprintf("%s=%s, %s in (%s)", dfv1.KeyPipelineName, pl.Name,
dfv1.KeyVertexName, strings.Join(maps.Keys(sources), ","))
selector, _ := labels.Parse(label)
if err := r.client.List(ctx, &pods, &client.ListOptions{Namespace: pl.Namespace, LabelSelector: selector}); err != nil {
return false, err
}
return len(pods.Items) == 0, nil
}

func (r *pipelineReconciler) scaleDownSourceVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
Expand Down Expand Up @@ -965,6 +988,8 @@ func (r *pipelineReconciler) checkChildrenResourceStatus(ctx context.Context, pi
return
}
}
// if all conditions are True, clear the status message.
pipeline.Status.Message = ""
}()

// get the daemon deployment and update the status of it to the pipeline
Expand Down
Loading