diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 8361e6b8ef7c7..11272a6e56d7b 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -38,7 +38,7 @@ use crate::manager::{WorkerId, WorkerLocations}; use crate::model::ActorId; use crate::stream::stream_graph::fragment::CompleteStreamFragmentGraph; use crate::stream::stream_graph::id::GlobalFragmentId as Id; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; type HashMappingId = usize; @@ -210,6 +210,12 @@ impl Scheduler { .push(p); } + if parallel_units_map.is_empty() { + return Err(MetaError::unavailable( + "No available parallel units to schedule".to_string(), + )); + } + // Use all parallel units if no default parallelism is specified. let default_parallelism = default_parallelism.map_or_else( || parallel_units_map.values().map(|p| p.len()).sum::(), @@ -236,10 +242,10 @@ impl Scheduler { round_robin.truncate(default_parallelism); if round_robin.len() < default_parallelism { - bail!( + return Err(MetaError::unavailable(format!( "Not enough parallel units to schedule {} parallelism", default_parallelism - ); + ))); } // Sort all parallel units by ID to achieve better vnode locality.