Skip to content

Commit

Permalink
fix: create buffers and buckets before updating Vertices (#2112)
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 authored and whynowy committed Oct 9, 2024
1 parent e1a586a commit a4a4fd0
Showing 1 changed file with 41 additions and 40 deletions.
81 changes: 41 additions & 40 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,46 +280,6 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df
newBuckets[b] = b
}
}
newObjs := buildVertices(pl)
for vertexName, newObj := range newObjs {
if oldObj, existing := existingObjs[vertexName]; !existing {
if err := r.client.Create(ctx, &newObj); err != nil {
if apierrors.IsAlreadyExists(err) { // probably somebody else already created it
continue
} else {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "CreateVertexFailed", "Failed to create vertex: %w", err.Error())
return fmt.Errorf("failed to create vertex, err: %w", err)
}
}
log.Infow("Created vertex successfully", zap.String("vertex", vertexName))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "CreateVertexSuccess", "Created vertex %s successfully", vertexName)
} else {
if oldObj.GetAnnotations()[dfv1.KeyHash] != newObj.GetAnnotations()[dfv1.KeyHash] { // need to update
originalReplicas := oldObj.Spec.Replicas
oldObj.Spec = newObj.Spec
oldObj.Spec.Replicas = originalReplicas
oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash]
if err := r.client.Update(ctx, &oldObj); err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "UpdateVertexFailed", "Failed to update vertex: %w", err.Error())
return fmt.Errorf("failed to update vertex, err: %w", err)
}
log.Infow("Updated vertex successfully", zap.String("vertex", vertexName))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "UpdateVertexSuccess", "Updated vertex %s successfully", vertexName)
}
delete(existingObjs, vertexName)
}
}
for _, v := range existingObjs {
if err := r.client.Delete(ctx, &v); err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "DeleteStaleVertexFailed", "Failed to delete vertex: %w", err.Error())
return fmt.Errorf("failed to delete vertex, err: %w", err)
}
log.Infow("Deleted stale vertex successfully", zap.String("vertex", v.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "DeleteStaleVertexSuccess", "Deleted stale vertex %s successfully", v.Name)
// Clean up vertex replica metrics
reconciler.VertexDesiredReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
reconciler.VertexCurrentReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
}

// create batch job
if len(newBuffers) > 0 || len(newBuckets) > 0 {
Expand Down Expand Up @@ -362,6 +322,47 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df
r.recorder.Eventf(pl, corev1.EventTypeNormal, "CreateJobForISBDeletionSuccessful", "Create ISB deletion job successfully")
}

newObjs := buildVertices(pl)
for vertexName, newObj := range newObjs {
if oldObj, existing := existingObjs[vertexName]; !existing {
if err := r.client.Create(ctx, &newObj); err != nil {
if apierrors.IsAlreadyExists(err) { // probably somebody else already created it
continue
} else {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "CreateVertexFailed", "Failed to create vertex: %w", err.Error())
return fmt.Errorf("failed to create vertex, err: %w", err)
}
}
log.Infow("Created vertex successfully", zap.String("vertex", vertexName))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "CreateVertexSuccess", "Created vertex %s successfully", vertexName)
} else {
if oldObj.GetAnnotations()[dfv1.KeyHash] != newObj.GetAnnotations()[dfv1.KeyHash] { // need to update
originalReplicas := oldObj.Spec.Replicas
oldObj.Spec = newObj.Spec
oldObj.Spec.Replicas = originalReplicas
oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash]
if err := r.client.Update(ctx, &oldObj); err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "UpdateVertexFailed", "Failed to update vertex: %w", err.Error())
return fmt.Errorf("failed to update vertex, err: %w", err)
}
log.Infow("Updated vertex successfully", zap.String("vertex", vertexName))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "UpdateVertexSuccess", "Updated vertex %s successfully", vertexName)
}
delete(existingObjs, vertexName)
}
}
for _, v := range existingObjs {
if err := r.client.Delete(ctx, &v); err != nil {
r.recorder.Eventf(pl, corev1.EventTypeWarning, "DeleteStaleVertexFailed", "Failed to delete vertex: %w", err.Error())
return fmt.Errorf("failed to delete vertex, err: %w", err)
}
log.Infow("Deleted stale vertex successfully", zap.String("vertex", v.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "DeleteStaleVertexSuccess", "Deleted stale vertex %s successfully", v.Name)
// Clean up vertex replica metrics
reconciler.VertexDesiredReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
reconciler.VertexCurrentReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
}

// Daemon service
if err := r.createOrUpdateDaemonService(ctx, pl); err != nil {
return err
Expand Down

0 comments on commit a4a4fd0

Please sign in to comment.