Skip to content

Commit

Permalink
fix: should never scale down to < min (#1832)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Jul 16, 2024
1 parent cea0783 commit fa18f97
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
scaleUpCooldown := float64(vertex.Spec.Scale.GetScaleUpCooldownSeconds())
if secondsSinceLastScale < scaleDownCooldown && secondsSinceLastScale < scaleUpCooldown {
// Skip scaling without needing further calculation
log.Debug("Cooldown period, skip scaling.")
log.Infof("Cooldown period, skip scaling.")
return nil
}
if vertex.Status.Phase != dfv1.VertexPhaseRunning {
log.Debug("Vertex not in Running phase.")
log.Infof("Vertex not in Running phase, skip scaling.")
return nil
}
pl := &dfv1.Pipeline{}
Expand All @@ -197,11 +197,11 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
return nil
}
if pl.Spec.Lifecycle.GetDesiredPhase() != dfv1.PipelinePhaseRunning {
log.Debug("Corresponding Pipeline not in Running state.")
log.Info("Corresponding Pipeline not in Running state, skip scaling.")
return nil
}
if int(vertex.Status.Replicas) != vertex.GetReplicas() {
log.Debugf("Vertex %s might be under processing, replicas mismatch.", vertex.Name)
log.Infof("Vertex %s might be under processing, replicas mismatch, skip scaling.", vertex.Name)
return nil
}

Expand All @@ -227,21 +227,21 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
// If yes, then scale them back to 1
if !vertex.IsASource() {
if totalCurrentPending <= 0 {
log.Debugf("Vertex %s doesn't have any pending messages, skipping scaling back to 1", vertex.Name)
log.Infof("Vertex %s doesn't have any pending messages, skipping scaling back to 1", vertex.Name)
return nil
} else {
log.Debugf("Vertex %s has some pending messages, scaling back to 1", vertex.Name)
log.Infof("Vertex %s has some pending messages, scaling back to 1", vertex.Name)
return s.patchVertexReplicas(ctx, vertex, 1)
}
}

// For source vertices,
// Periodically wake them up from 0 replicas to 1, to peek for the incoming messages
if secondsSinceLastScale >= float64(vertex.Spec.Scale.GetZeroReplicaSleepSeconds()) {
log.Debugf("Vertex %s has slept %v seconds, scaling up to peek.", vertex.Name, secondsSinceLastScale)
log.Infof("Vertex %s has slept %v seconds, scaling up to peek.", vertex.Name, secondsSinceLastScale)
return s.patchVertexReplicas(ctx, vertex, 1)
} else {
log.Debugf("Vertex %q has slept %v seconds, hasn't reached zeroReplicaSleepSeconds (%v seconds).", vertex.Name, secondsSinceLastScale, vertex.Spec.Scale.GetZeroReplicaSleepSeconds())
log.Infof("Vertex %q has slept %v seconds, hasn't reached zeroReplicaSleepSeconds (%v seconds), skip scaling.", vertex.Name, secondsSinceLastScale, vertex.Spec.Scale.GetZeroReplicaSleepSeconds())
return nil
}
}
Expand All @@ -265,7 +265,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
rate, existing := m.ProcessingRates["default"]
// If rate is not available, we skip scaling.
if !existing || rate.GetValue() < 0 { // Rate not available
log.Debugf("Vertex %s has no rate information, skip scaling.", vertex.Name)
log.Infof("Vertex %s has no rate information, skip scaling.", vertex.Name)
return nil
}
partitionRates = append(partitionRates, rate.GetValue())
Expand All @@ -274,7 +274,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
pending, existing := m.Pendings["default"]
if !existing || pending.GetValue() < 0 || pending.GetValue() == isb.PendingNotAvailable {
// Pending not available, we don't do anything
log.Debugf("Vertex %s has no pending messages information, skip scaling.", vertex.Name)
log.Infof("Vertex %s has no pending messages information, skip scaling.", vertex.Name)
return nil
}
totalPending += pending.GetValue()
Expand All @@ -297,16 +297,16 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
} else {
desired = s.desiredReplicas(ctx, vertex, partitionRates, partitionPending, partitionBufferLengths, partitionAvailableBufferLengths)
}
log.Debugf("Calculated desired replica number of vertex %q is: %d.", vertex.Name, desired)
log.Infof("Calculated desired replica number of vertex %q is: %d.", vertex.Name, desired)
max := vertex.Spec.Scale.GetMaxReplicas()
min := vertex.Spec.Scale.GetMinReplicas()
if desired > max {
desired = max
log.Debugf("Calculated desired replica number %d of vertex %q is greater than max, using max %d.", vertex.Name, desired, max)
log.Infof("Calculated desired replica number %d of vertex %q is greater than max, using max %d.", vertex.Name, desired, max)
}
if desired < min {
desired = min
log.Debugf("Calculated desired replica number %d of vertex %q is smaller than min, using min %d.", vertex.Name, desired, min)
log.Infof("Calculated desired replica number %d of vertex %q is smaller than min, using min %d.", vertex.Name, desired, min)
}
if current > max || current < min { // Someone might have manually scaled up/down the vertex
return s.patchVertexReplicas(ctx, vertex, desired)
Expand All @@ -318,7 +318,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
diff = maxAllowed
}
if secondsSinceLastScale < scaleDownCooldown {
log.Debugf("Cooldown period for scaling down, skip scaling.")
log.Infof("Cooldown period for scaling down, skip scaling.")
return nil
}
return s.patchVertexReplicas(ctx, vertex, current-diff) // We scale down gradually
Expand All @@ -327,23 +327,23 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
// When scaling up, need to check back pressure
directPressure, downstreamPressure := s.hasBackPressure(*pl, *vertex)
if directPressure {
if current > 1 {
log.Debugf("Vertex %s has direct back pressure from connected vertices, decreasing one replica.", key)
if current > min {
log.Infof("Vertex %s has direct back pressure from connected vertices, decreasing one replica.", key)
return s.patchVertexReplicas(ctx, vertex, current-1)
} else {
log.Debugf("Vertex %s has direct back pressure from connected vertices, skip scaling.", key)
log.Infof("Vertex %s has direct back pressure from connected vertices, skip scaling.", key)
return nil
}
} else if downstreamPressure {
log.Debugf("Vertex %s has back pressure in downstream vertices, skip scaling.", key)
log.Infof("Vertex %s has back pressure in downstream vertices, skip scaling.", key)
return nil
}
diff := desired - current
if diff > maxAllowed {
diff = maxAllowed
}
if secondsSinceLastScale < scaleUpCooldown {
log.Debugf("Cooldown period for scaling up, skip scaling.")
log.Infof("Cooldown period for scaling up, skip scaling.")
return nil
}
return s.patchVertexReplicas(ctx, vertex, current+diff) // We scale up gradually
Expand Down Expand Up @@ -496,7 +496,7 @@ func (s *Scaler) patchVertexReplicas(ctx context.Context, vertex *dfv1.Vertex, d
if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to patch vertex replicas, %w", err)
}
log.Infow("Auto scaling - vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("pipeline", vertex.Spec.PipelineName), zap.String("vertex", vertex.Spec.Name))
log.Infow("Auto scaling - vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", vertex.Namespace), zap.String("pipeline", vertex.Spec.PipelineName), zap.String("vertex", vertex.Spec.Name))
return nil
}

Expand Down

0 comments on commit fa18f97

Please sign in to comment.