From e7ffde36e0df407ed04b8414a3a4dd4f220294a8 Mon Sep 17 00:00:00 2001 From: Carter Anderson Date: Mon, 12 Oct 2020 12:21:37 -0700 Subject: [PATCH] add thread local resources --- crates/bevy_app/src/app_builder.rs | 18 ++ crates/bevy_ecs/hecs/src/borrow.rs | 6 + crates/bevy_ecs/hecs/src/lib.rs | 2 +- crates/bevy_ecs/src/resource/resources.rs | 245 +++++++++++++++++++++- crates/bevy_gilrs/src/gilrs_system.rs | 43 ++-- crates/bevy_gilrs/src/lib.rs | 13 +- 6 files changed, 287 insertions(+), 40 deletions(-) diff --git a/crates/bevy_app/src/app_builder.rs b/crates/bevy_app/src/app_builder.rs index 6672c877269f06..8c9b0d1052e398 100644 --- a/crates/bevy_app/src/app_builder.rs +++ b/crates/bevy_app/src/app_builder.rs @@ -227,6 +227,14 @@ impl AppBuilder { self } + pub fn add_thread_local_resource(&mut self, resource: T) -> &mut Self + where + T: 'static, + { + self.app.resources.insert_thread_local(resource); + self + } + pub fn init_resource(&mut self) -> &mut Self where R: FromResources + Send + Sync + 'static, @@ -237,6 +245,16 @@ impl AppBuilder { self } + pub fn init_thread_local_resource(&mut self) -> &mut Self + where + R: FromResources + 'static, + { + let resource = R::from_resources(&self.app.resources); + self.app.resources.insert_thread_local(resource); + + self + } + pub fn set_runner(&mut self, run_fn: impl Fn(App) + 'static) -> &mut Self { self.app.runner = Box::new(run_fn); self diff --git a/crates/bevy_ecs/hecs/src/borrow.rs b/crates/bevy_ecs/hecs/src/borrow.rs index e8a9adff8d59c8..7f500098c17f41 100644 --- a/crates/bevy_ecs/hecs/src/borrow.rs +++ b/crates/bevy_ecs/hecs/src/borrow.rs @@ -22,14 +22,17 @@ use core::{ use crate::{archetype::Archetype, Component, MissingComponent}; +/// Atomically enforces Rust-style borrow checking at runtime #[derive(Debug)] pub struct AtomicBorrow(AtomicUsize); impl AtomicBorrow { + /// Creates a new AtomicBorrow pub const fn new() -> Self { Self(AtomicUsize::new(0)) } + /// Starts a new immutable borrow. This can be called any number of times pub fn borrow(&self) -> bool { let value = self.0.fetch_add(1, Ordering::Acquire).wrapping_add(1); if value == 0 { @@ -44,18 +47,21 @@ impl AtomicBorrow { } } + /// Starts a new mutable borrow. This must be unique. It cannot be done in parallel with other borrows or borrow_muts pub fn borrow_mut(&self) -> bool { self.0 .compare_exchange(0, UNIQUE_BIT, Ordering::Acquire, Ordering::Relaxed) .is_ok() } + /// Release an immutable borrow. pub fn release(&self) { let value = self.0.fetch_sub(1, Ordering::Release); debug_assert!(value != 0, "unbalanced release"); debug_assert!(value & UNIQUE_BIT == 0, "shared release of unique borrow"); } + /// Release a mutable borrow. pub fn release_mut(&self) { let value = self.0.fetch_and(!UNIQUE_BIT, Ordering::Release); debug_assert_ne!(value & UNIQUE_BIT, 0, "unique release of shared borrow"); diff --git a/crates/bevy_ecs/hecs/src/lib.rs b/crates/bevy_ecs/hecs/src/lib.rs index 7191b4358e13e6..ed5637cd131868 100644 --- a/crates/bevy_ecs/hecs/src/lib.rs +++ b/crates/bevy_ecs/hecs/src/lib.rs @@ -76,7 +76,7 @@ mod serde; mod world; pub use archetype::{Archetype, TypeState}; -pub use borrow::{Ref, RefMut}; +pub use borrow::{AtomicBorrow, Ref, RefMut}; pub use bundle::{Bundle, DynamicBundle, MissingComponent}; pub use entities::{Entity, EntityReserver, Location, NoSuchEntity}; pub use entity_builder::{BuiltEntity, EntityBuilder}; diff --git a/crates/bevy_ecs/src/resource/resources.rs b/crates/bevy_ecs/src/resource/resources.rs index e09273c089b58e..e07d92fee5dbed 100644 --- a/crates/bevy_ecs/src/resource/resources.rs +++ b/crates/bevy_ecs/src/resource/resources.rs @@ -1,9 +1,15 @@ use super::{FetchResource, ResourceQuery}; use crate::system::SystemId; -use bevy_hecs::{Archetype, Entity, Ref, RefMut, TypeInfo, TypeState}; +use bevy_hecs::{Archetype, AtomicBorrow, Entity, Ref, RefMut, TypeInfo, TypeState}; use bevy_utils::HashMap; use core::any::TypeId; -use std::ptr::NonNull; +use downcast_rs::{impl_downcast, Downcast}; +use std::{ + fmt::Debug, + ops::{Deref, DerefMut}, + ptr::NonNull, + thread::ThreadId, +}; /// A Resource type pub trait Resource: Send + Sync + 'static {} @@ -22,10 +28,76 @@ pub enum ResourceIndex { System(SystemId), } +// TODO: consider using this for normal resources (would require change tracking) +trait ResourceStorage: Downcast {} +impl_downcast!(ResourceStorage); + +struct StoredResource { + value: T, + atomic_borrow: AtomicBorrow, +} + +pub struct VecResourceStorage { + stored: Vec>, +} + +impl VecResourceStorage { + fn get(&self, index: usize) -> Option> { + self.stored + .get(index) + .map(|stored| ResourceRef::new(&stored.value, &stored.atomic_borrow)) + } + + fn get_mut(&self, index: usize) -> Option> { + self.stored.get(index).map(|stored| + // SAFE: ResourceRefMut ensures that this borrow is unique + unsafe { + let value = &stored.value as *const T as *mut T; + ResourceRefMut::new(&mut *value, &stored.atomic_borrow) + }) + } + + fn push(&mut self, resource: T) { + self.stored.push(StoredResource { + atomic_borrow: AtomicBorrow::new(), + value: resource, + }) + } + + fn set(&mut self, index: usize, resource: T) { + self.stored[index].value = resource; + } + + fn is_empty(&self) -> bool { + self.stored.is_empty() + } +} + +impl Default for VecResourceStorage { + fn default() -> Self { + Self { + stored: Default::default(), + } + } +} + +impl ResourceStorage for VecResourceStorage {} + /// A collection of resource instances identified by their type. -#[derive(Debug, Default)] pub struct Resources { pub(crate) resource_data: HashMap, + thread_local_data: HashMap>, + main_thread_id: ThreadId, +} + +impl Default for Resources { + fn default() -> Self { + Resources { + resource_data: Default::default(), + thread_local_data: Default::default(), + main_thread_id: std::thread::current().id(), + } + } } impl Resources { @@ -33,6 +105,26 @@ impl Resources { self.insert_resource(resource, ResourceIndex::Global); } + pub fn insert_thread_local(&mut self, resource: T) { + self.check_thread_local(); + let entry = self + .thread_local_data + .entry(TypeId::of::()) + .or_insert_with(|| Box::new(VecResourceStorage::::default())); + let resources = entry.downcast_mut::>().unwrap(); + if resources.is_empty() { + resources.push(resource); + } else { + resources.set(0, resource); + } + } + + fn check_thread_local(&self) { + if std::thread::current().id() != self.main_thread_id { + panic!("Attempted to access a thread local resource off of the main thread.") + } + } + pub fn contains(&self) -> bool { self.get_resource::(ResourceIndex::Global).is_some() } @@ -45,6 +137,26 @@ impl Resources { self.get_resource_mut(ResourceIndex::Global) } + pub fn get_thread_local(&self) -> Option> { + self.check_thread_local(); + self.thread_local_data + .get(&TypeId::of::()) + .and_then(|storage| { + let resources = storage.downcast_ref::>().unwrap(); + resources.get(0) + }) + } + + pub fn get_thread_local_mut(&self) -> Option> { + self.check_thread_local(); + self.thread_local_data + .get(&TypeId::of::()) + .and_then(|storage| { + let resources = storage.downcast_ref::>().unwrap(); + resources.get_mut(0) + }) + } + /// Returns a clone of the underlying resource, this is helpful when borrowing something /// cloneable (like a task pool) without taking a borrow on the resource map pub fn get_cloned(&self) -> Option { @@ -281,6 +393,93 @@ where } } +/// Shared borrow of an entity's component +#[derive(Clone)] +pub struct ResourceRef<'a, T: 'static> { + borrow: &'a AtomicBorrow, + resource: &'a T, +} + +impl<'a, T: 'static> ResourceRef<'a, T> { + /// Creates a new resource borrow + pub fn new(resource: &'a T, borrow: &'a AtomicBorrow) -> Self { + borrow.borrow(); + Self { resource, borrow } + } +} + +unsafe impl Send for ResourceRef<'_, T> {} +unsafe impl Sync for ResourceRef<'_, T> {} + +impl<'a, T: 'static> Drop for ResourceRef<'a, T> { + fn drop(&mut self) { + self.borrow.release() + } +} + +impl<'a, T: 'static> Deref for ResourceRef<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + self.resource + } +} + +impl<'a, T: 'static> Debug for ResourceRef<'a, T> +where + T: Debug, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + self.deref().fmt(f) + } +} + +/// Unique borrow of a resource +pub struct ResourceRefMut<'a, T: 'static> { + borrow: &'a AtomicBorrow, + resource: &'a mut T, +} + +impl<'a, T: 'static> ResourceRefMut<'a, T> { + /// Creates a new entity component mutable borrow + pub fn new(resource: &'a mut T, borrow: &'a AtomicBorrow) -> Self { + borrow.borrow_mut(); + Self { resource, borrow } + } +} + +unsafe impl Send for ResourceRefMut<'_, T> {} +unsafe impl Sync for ResourceRefMut<'_, T> {} + +impl<'a, T: 'static> Drop for ResourceRefMut<'a, T> { + fn drop(&mut self) { + self.borrow.release_mut(); + } +} + +impl<'a, T: 'static> Deref for ResourceRefMut<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + self.resource + } +} + +impl<'a, T: 'static> DerefMut for ResourceRefMut<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.resource + } +} + +impl<'a, T: 'static> Debug for ResourceRefMut<'a, T> +where + T: Debug, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + self.deref().fmt(f) + } +} + #[cfg(test)] mod tests { use super::Resources; @@ -335,4 +534,44 @@ mod tests { let _x = resources.get_mut::(); let _y = resources.get_mut::(); } + + #[test] + fn thread_local_resource() { + let mut resources = Resources::default(); + resources.insert_thread_local(123i32); + resources.insert_thread_local(456i64); + assert_eq!(*resources.get_thread_local::().unwrap(), 123); + assert_eq!(*resources.get_thread_local_mut::().unwrap(), 456); + } + + #[test] + fn thread_local_resource_ref_aliasing() { + let mut resources = Resources::default(); + resources.insert_thread_local(123i32); + let a = resources.get_thread_local::().unwrap(); + let b = resources.get_thread_local::().unwrap(); + assert_eq!(*a, 123); + assert_eq!(*b, 123); + } + + #[test] + #[should_panic] + fn thread_local_resource_mut_ref_aliasing() { + let mut resources = Resources::default(); + resources.insert_thread_local(123i32); + let a = resources.get_thread_local::().unwrap(); + let b = resources.get_thread_local_mut::().unwrap(); + } + + #[test] + #[should_panic] + fn thread_local_resource_panic() { + let mut resources = Resources::default(); + resources.insert_thread_local(0i32); + std::thread::spawn(move || { + let _ = resources.get_thread_local_mut::(); + }) + .join() + .unwrap(); + } } diff --git a/crates/bevy_gilrs/src/gilrs_system.rs b/crates/bevy_gilrs/src/gilrs_system.rs index f1486c1308c0c8..f5c83628bd5675 100644 --- a/crates/bevy_gilrs/src/gilrs_system.rs +++ b/crates/bevy_gilrs/src/gilrs_system.rs @@ -1,34 +1,16 @@ use crate::converter::{convert_axis, convert_button, convert_gamepad_id}; use bevy_app::Events; -use bevy_ecs::{Res, ResMut}; +use bevy_ecs::{Resources, World}; use bevy_input::prelude::*; use gilrs::{Button, EventType, Gilrs}; -use std::sync::{Arc, Mutex}; -// TODO: remove this if/when bevy_ecs supports thread local resources -#[derive(Debug)] -struct GilrsSendWrapper(Gilrs); - -unsafe impl Send for GilrsSendWrapper {} - -#[derive(Debug)] -pub struct GilrsArcMutexWrapper(Arc>); - -impl GilrsArcMutexWrapper { - pub fn new(gilrs: Gilrs) -> GilrsArcMutexWrapper { - GilrsArcMutexWrapper(Arc::new(Mutex::new(GilrsSendWrapper(gilrs)))) - } -} - -pub fn gilrs_startup_system( - gilrs: Res, - mut gamepad_event: ResMut>, - mut inputs: ResMut>, - mut axes: ResMut>, -) { +pub fn gilrs_startup_system(_world: &mut World, resources: &mut Resources) { + let gilrs = resources.get_thread_local::().unwrap(); + let mut gamepad_event = resources.get_mut::>().unwrap(); + let mut inputs = resources.get_mut::>().unwrap(); + let mut axes = resources.get_mut::>().unwrap(); gamepad_event.update(); inputs.update(); - let gilrs = &gilrs.0.lock().unwrap().0; for (gilrs_id, gilrs_gamepad) in gilrs.gamepads() { connect_gamepad( gilrs_gamepad, @@ -40,15 +22,14 @@ pub fn gilrs_startup_system( } } -pub fn gilrs_update_system( - gilrs: Res, - mut gamepad_event: ResMut>, - mut inputs: ResMut>, - mut axes: ResMut>, -) { +pub fn gilrs_update_system(_world: &mut World, resources: &mut Resources) { + let mut gilrs = resources.get_thread_local_mut::().unwrap(); + let mut gamepad_event = resources.get_mut::>().unwrap(); + let mut inputs = resources.get_mut::>().unwrap(); + let mut axes = resources.get_mut::>().unwrap(); + gamepad_event.update(); inputs.update(); - let gilrs = &mut gilrs.0.lock().unwrap().0; while let Some(gilrs_event) = gilrs.next_event() { match gilrs_event.event { EventType::Connected => { diff --git a/crates/bevy_gilrs/src/lib.rs b/crates/bevy_gilrs/src/lib.rs index af9657f2d95ae1..8edce92f7956f2 100644 --- a/crates/bevy_gilrs/src/lib.rs +++ b/crates/bevy_gilrs/src/lib.rs @@ -2,8 +2,8 @@ mod converter; mod gilrs_system; use bevy_app::prelude::*; -use bevy_ecs::IntoQuerySystem; -use gilrs_system::{gilrs_startup_system, gilrs_update_system, GilrsArcMutexWrapper}; +use bevy_ecs::prelude::*; +use gilrs_system::{gilrs_startup_system, gilrs_update_system}; #[derive(Default)] pub struct GilrsPlugin; @@ -12,9 +12,12 @@ impl Plugin for GilrsPlugin { fn build(&self, app: &mut AppBuilder) { match gilrs::Gilrs::new() { Ok(gilrs) => { - app.add_resource(GilrsArcMutexWrapper::new(gilrs)) - .add_startup_system(gilrs_startup_system.system()) - .add_system_to_stage(stage::EVENT_UPDATE, gilrs_update_system.system()); + app.add_thread_local_resource(gilrs) + .add_startup_system(gilrs_startup_system.thread_local_system()) + .add_system_to_stage( + stage::EVENT_UPDATE, + gilrs_update_system.thread_local_system(), + ); } Err(err) => log::error!("Failed to start Gilrs. {}", err), }