Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task System for Bevy #384

Merged
merged 18 commits into from
Aug 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ bevy_text = { path = "crates/bevy_text", version = "0.1" }
bevy_ui = { path = "crates/bevy_ui", version = "0.1" }
bevy_utils = { path = "crates/bevy_utils", version = "0.1" }
bevy_window = { path = "crates/bevy_window", version = "0.1" }
bevy_tasks = { path = "crates/bevy_tasks", version = "0.1" }

# bevy (optional)
bevy_audio = { path = "crates/bevy_audio", optional = true, version = "0.1" }
Expand Down
4 changes: 3 additions & 1 deletion crates/bevy_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ keywords = ["bevy"]
# bevy
bevy_derive = { path = "../bevy_derive", version = "0.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.1" }
bevy_math = { path = "../bevy_math", version = "0.1" }

# other
libloading = "0.6"
log = { version = "0.4", features = ["release_max_level_info"] }
serde = { version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"]}
8 changes: 7 additions & 1 deletion crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::app_builder::AppBuilder;
use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions};
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};

#[allow(clippy::needless_doctest_main)]
Expand Down Expand Up @@ -63,6 +63,12 @@ impl App {
}

pub fn run(mut self) {
// Setup the default bevy task pools
self.resources
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced bevy_tasks should be hard-coded into apps, especially given that we're just adding resources. Can we make this a plugin that we add with add_default_plugins()?

Copy link
Contributor

@aclysma aclysma Aug 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a plugin in bevy_tasks will cause circular dependencies (app depends on ecs, ecs depends on tasks). Where to do you want it to be?

.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(&mut self.resources);

self.startup_schedule.initialize(&mut self.resources);
self.startup_executor.run(
&mut self.startup_schedule,
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ mod app_builder;
mod event;
mod plugin;
mod schedule_runner;
mod task_pool_options;

pub use app::*;
pub use app_builder::*;
pub use bevy_derive::DynamicPlugin;
pub use event::*;
pub use plugin::*;
pub use schedule_runner::*;
pub use task_pool_options::*;

pub mod prelude {
pub use crate::{
Expand Down
147 changes: 147 additions & 0 deletions crates/bevy_app/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use bevy_ecs::Resources;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};

/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
#[derive(Clone)]
pub struct TaskPoolThreadAssignmentPolicy {
/// Force using at least this many threads
pub min_threads: usize,
/// Under no circumstance use more than this many threads for this pool
pub max_threads: usize,
/// Target using this percentage of total cores, clamped by min_threads and max_threads. It is
/// permitted to use 1.0 to try to use all remaining threads
pub percent: f32,
}

impl TaskPoolThreadAssignmentPolicy {
/// Determine the number of threads to use for this task pool
fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize {
assert!(self.percent >= 0.0);
let mut desired = (total_threads as f32 * self.percent).round() as usize;

// Limit ourselves to the number of cores available
desired = desired.min(remaining_threads);

// Clamp by min_threads, max_threads. (This may result in us using more threads than are
// available, this is intended. An example case where this might happen is a device with
// <= 2 threads.
bevy_math::clamp(desired, self.min_threads, self.max_threads)
}
}

/// Helper for configuring and creating the default task pools. For end-users who want full control,
/// insert the default task pools into the resource map manually. If the pools are already inserted,
/// this helper will do nothing.
#[derive(Clone)]
pub struct DefaultTaskPoolOptions {
/// If the number of physical cores is less than min_total_threads, force using min_total_threads
pub min_total_threads: usize,
/// If the number of physical cores is grater than max_total_threads, force using max_total_threads
pub max_total_threads: usize,

/// Used to determine number of IO threads to allocate
pub io: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of async compute threads to allocate
pub async_compute: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of compute threads to allocate
pub compute: TaskPoolThreadAssignmentPolicy,
}

impl Default for DefaultTaskPoolOptions {
fn default() -> Self {
DefaultTaskPoolOptions {
// By default, use however many cores are available on the system
min_total_threads: 1,
max_total_threads: std::usize::MAX,

// Use 25% of cores for IO, at least 1, no more than 4
io: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},

// Use 25% of cores for async compute, at least 1, no more than 4
async_compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},

// Use all remaining cores for compute (at least 1)
compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: std::usize::MAX,
percent: 1.0, // This 1.0 here means "whatever is left over"
},
}
}
}

impl DefaultTaskPoolOptions {
/// Create a configuration that forces using the given number of threads.
pub fn with_num_threads(thread_count: usize) -> Self {
let mut options = Self::default();
options.min_total_threads = thread_count;
options.max_total_threads = thread_count;

options
}

/// Inserts the default thread pools into the given resource map based on the configured values
pub fn create_default_pools(&self, resources: &mut Resources) {
let total_threads = bevy_math::clamp(
bevy_tasks::logical_core_count(),
self.min_total_threads,
self.max_total_threads,
);

let mut remaining_threads = total_threads;

if !resources.contains::<IOTaskPool>() {
cart marked this conversation as resolved.
Show resolved Hide resolved
// Determine the number of IO threads we will use
let io_threads = self
.io
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= io_threads;

resources.insert(IOTaskPool(
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
.build(),
));
}

if !resources.contains::<AsyncComputeTaskPool>() {
// Determine the number of async compute threads we will use
let async_compute_threads = self
.async_compute
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= async_compute_threads;

resources.insert(AsyncComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
.build(),
));
}

if !resources.contains::<ComputeTaskPool>() {
// Determine the number of compute threads we will use
// This is intentionally last so that an end user can specify 1.0 as the percent
let compute_threads = self
.compute
.get_number_of_threads(remaining_threads, total_threads);

resources.insert(ComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
.build(),
));
}
}
}
2 changes: 1 addition & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ profiler = []

[dependencies]
bevy_hecs = { path = "hecs", features = ["macros", "serialize"], version = "0.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.1" }
bevy_utils = { path = "../bevy_utils", version = "0.1" }
rand = "0.7.2"
rayon = "1.3"
crossbeam-channel = "0.4.2"
fixedbitset = "0.3.0"
downcast-rs = "1.1.1"
Expand Down
6 changes: 6 additions & 0 deletions crates/bevy_ecs/src/resource/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ impl Resources {
self.get_resource_mut(ResourceIndex::Global)
}

/// Returns a clone of the underlying resource, this is helpful when borrowing something
/// cloneable (like a task pool) without taking a borrow on the resource map
pub fn get_cloned<T: Resource + Clone>(&self) -> Option<T> {
self.get::<T>().map(|r| (*r).clone())
}

#[allow(clippy::needless_lifetimes)]
pub fn get_local<'a, T: Resource>(&'a self, id: SystemId) -> Option<Ref<'a, T>> {
self.get_resource(ResourceIndex::System(id))
Expand Down
70 changes: 18 additions & 52 deletions crates/bevy_ecs/src/schedule/parallel_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use bevy_hecs::{ArchetypesGeneration, World};
use crossbeam_channel::{Receiver, Sender};
use fixedbitset::FixedBitSet;
use parking_lot::Mutex;
use rayon::ScopeFifo;
use std::{ops::Range, sync::Arc};

/// Executes each schedule stage in parallel by analyzing system dependencies.
Expand Down Expand Up @@ -66,52 +65,6 @@ impl ParallelExecutor {
}
}

/// This can be added as an app resource to control the global `rayon::ThreadPool` used by ecs.
// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync.
#[derive(Debug, Default, Clone)]
pub struct ParallelExecutorOptions {
/// If some value, we'll set up the thread pool to use at most n threads. See `rayon::ThreadPoolBuilder::num_threads`.
num_threads: Option<usize>,
/// If some value, we'll set up the thread pool's' workers to the given stack size. See `rayon::ThreadPoolBuilder::stack_size`.
stack_size: Option<usize>,
// TODO: Do we also need/want to expose other features (*_handler, etc.)
}

impl ParallelExecutorOptions {
/// Creates a new ParallelExecutorOptions instance
pub fn new() -> Self {
Self::default()
}

/// Sets the num_threads option, using the builder pattern
pub fn with_num_threads(mut self, num_threads: Option<usize>) -> Self {
self.num_threads = num_threads;
self
}

/// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing,
/// otherwise your application may run into stability and performance issues.
pub fn with_stack_size(mut self, stack_size: Option<usize>) -> Self {
self.stack_size = stack_size;
self
}

/// Creates a new ThreadPoolBuilder based on the current options.
pub(crate) fn create_builder(&self) -> rayon::ThreadPoolBuilder {
let mut builder = rayon::ThreadPoolBuilder::new();

if let Some(num_threads) = self.num_threads {
builder = builder.num_threads(num_threads);
}

if let Some(stack_size) = self.stack_size {
builder = builder.stack_size(stack_size);
}

builder
}
}

#[derive(Debug, Clone)]
pub struct ExecutorStage {
/// each system's set of dependencies
Expand Down Expand Up @@ -262,7 +215,7 @@ impl ExecutorStage {
&mut self,
systems: &[Arc<Mutex<Box<dyn System>>>],
run_ready_type: RunReadyType,
scope: &ScopeFifo<'run>,
scope: &mut bevy_tasks::Scope<'run, ()>,
world: &'run World,
resources: &'run Resources,
) -> RunReadyResult {
Expand Down Expand Up @@ -308,7 +261,8 @@ impl ExecutorStage {
// handle multi-threaded system
let sender = self.sender.clone();
self.running_systems.insert(system_index);
scope.spawn_fifo(move |_| {

scope.spawn(async move {
cart marked this conversation as resolved.
Show resolved Hide resolved
let mut system = system.lock();
system.run(world, resources);
sender.send(system_index).unwrap();
Expand All @@ -328,6 +282,10 @@ impl ExecutorStage {
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
let compute_pool = resources
.get_cloned::<bevy_tasks::ComputeTaskPool>()
.unwrap();

// if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed {
self.system_dependencies.clear();
Expand Down Expand Up @@ -364,7 +322,8 @@ impl ExecutorStage {
// if there are no upcoming thread local systems, run everything right now
0..systems.len()
};
rayon::scope_fifo(|scope| {

compute_pool.scope(|scope| {
run_ready_result = self.run_ready_systems(
systems,
RunReadyType::Range(run_ready_system_index_range),
Expand All @@ -373,6 +332,7 @@ impl ExecutorStage {
resources,
);
});

loop {
// if all systems in the stage are finished, break out of the loop
if self.finished_systems.count_ones(..) == systems.len() {
Expand All @@ -393,7 +353,7 @@ impl ExecutorStage {
run_ready_result = RunReadyResult::Ok;
} else {
// wait for a system to finish, then run its dependents
rayon::scope_fifo(|scope| {
compute_pool.scope(|scope| {
loop {
// if all systems in the stage are finished, break out of the loop
if self.finished_systems.count_ones(..) == systems.len() {
Expand All @@ -410,7 +370,7 @@ impl ExecutorStage {
resources,
);

// if the next ready system is thread local, break out of this loop/rayon scope so it can be run
// if the next ready system is thread local, break out of this loop/bevy_tasks scope so it can be run
if let RunReadyResult::ThreadLocalReady(_) = run_ready_result {
break;
}
Expand Down Expand Up @@ -442,6 +402,7 @@ mod tests {
Commands,
};
use bevy_hecs::{Entity, World};
use bevy_tasks::{ComputeTaskPool, TaskPool};
use fixedbitset::FixedBitSet;
use parking_lot::Mutex;
use std::sync::Arc;
Expand All @@ -455,6 +416,8 @@ mod tests {
fn cross_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));

let mut schedule = Schedule::default();
schedule.add_stage("PreArchetypeChange");
schedule.add_stage("PostArchetypeChange");
Expand Down Expand Up @@ -484,6 +447,8 @@ mod tests {
fn intra_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));

let mut schedule = Schedule::default();
schedule.add_stage("update");

Expand Down Expand Up @@ -512,6 +477,7 @@ mod tests {
fn schedule() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));
resources.insert(Counter::default());
resources.insert(1.0f64);
resources.insert(2isize);
Expand Down
Loading