Skip to content

Commit

Permalink
fix: reject ddl rather than panic when no available parallel units to…
Browse files Browse the repository at this point in the history
… schedule (risingwavelabs#9080)
  • Loading branch information
yezizp2012 authored Apr 10, 2023
1 parent 046f1d2 commit d1868a9
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::manager::{WorkerId, WorkerLocations};
use crate::model::ActorId;
use crate::stream::stream_graph::fragment::CompleteStreamFragmentGraph;
use crate::stream::stream_graph::id::GlobalFragmentId as Id;
use crate::MetaResult;
use crate::{MetaError, MetaResult};

type HashMappingId = usize;

Expand Down Expand Up @@ -210,6 +210,12 @@ impl Scheduler {
.push(p);
}

if parallel_units_map.is_empty() {
return Err(MetaError::unavailable(
"No available parallel units to schedule".to_string(),
));
}

// Use all parallel units if no default parallelism is specified.
let default_parallelism = default_parallelism.map_or_else(
|| parallel_units_map.values().map(|p| p.len()).sum::<usize>(),
Expand All @@ -236,10 +242,10 @@ impl Scheduler {
round_robin.truncate(default_parallelism);

if round_robin.len() < default_parallelism {
bail!(
return Err(MetaError::unavailable(format!(
"Not enough parallel units to schedule {} parallelism",
default_parallelism
);
)));
}

// Sort all parallel units by ID to achieve better vnode locality.
Expand Down

0 comments on commit d1868a9

Please sign in to comment.