Skip to content

Commit

Permalink
Marker-based API (#227)
Browse files Browse the repository at this point in the history
This PR have multiple purposes, but unfortunately they highly related, so I can't split it into multiple PRs.

Right for prediction crates like https://github.com/Bendzae/bevy_replicon_snap or https://github.com/RJ/bevy_timewarp we rely on overriding deserialization and remove functions. But this approach have 3 major drawbacks:

1. In order to customize serialization for interpolated or predicted components, user need to copy the code from the crate or use some crate-provided helpers.
2. Serialization function is is unsafe due to pointer manipulation.
3. If you want to override writing or removal only for some entities, it's costly to check if a component present on each writing.

Marker-based API solves all of the above. Now for replication user register only `serialize`, `deserialize` and newly added `deserialize_in_place`. These functions assigned to rules as before. So instead of `ComponentFns`, we now have `SerdeFns`. And these function now only do one thing - serialization/deserialization.

All other logic now represented by `CommandFns`. Unlike `SerdeFns`, this struct created for each component only once and automatically when user register a `SerdeFns` for a component. It uses default `write` and `remove` functions. To override it, user can register a marker and assign a custom functions for each component. We use markers instead of archetypes because clients do not know in advance the archetype of the entity being deserialized.

Then on receive we collect a reusable `Vec<bool>` for each marker (by checking if a marker is present) and based on it pick `write`/`remove` functions for each component. We pick first marker that have a registration for the current component and present on an entity (`true` in the `Vec`). Markers are sorted by priority customizable by user.

Since for deserialization we call two functions now (`write` and then `deserialize`), we can do a nice trick to remove `unsafe` from ser/de customization and make it even more flexible!
Since `write` knows the component type of `deserialize`, we can store a type-erased function pointers and let `write` "restore" the type. "restoration" is done by calling `transmute`, but we abstract it inside `SerdeFns` and check the type if `debug_assertions` enabled.

So `serialize` and `deserialize` now just a normal functions with a very simple signature. This also unlocks the usage of `deserialize_in_place` that fallback into `deserialize` if not overridden by user.
Similar trick is done for `serialize`, except `read` is non-overridable by user and only used to remove extra unsafety from the public API.

The mentioned `read` and `write` are still unsafe since it's possible to pass `SerdeFns` that was created with a different type.

And lastly, instead of using `EntityWorldMut` for writing and removal, we use `EntityMut` and `Commands`. Using `EntityWorldMut` have 2 major drawbacks:
- You can't borrow a component from `EntityWorldMut` for in-place deserialization and `ClientMapper` at the same time.
- Each insertion creates a new archetype.

With commands we can spawn new entities inside `ClientMapper` and it will be possible to batch insertions in the future, see: bevyengine/bevy#10154

Huge thanks to @NiseVoid for the idea!

---------

Co-authored-by: UkoeHB <[email protected]>
  • Loading branch information
Shatur and UkoeHB authored Apr 20, 2024
1 parent 3cbbfa9 commit de53d66
Show file tree
Hide file tree
Showing 19 changed files with 1,805 additions and 359 deletions.
22 changes: 14 additions & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `AppReplicationExt::replicate_group` and `GroupRegistration` trait to register and customize component groups.
- `AppMarkerExt` to customize how components will be written based on markers present without overriding deserialization functions. Now third-party prediction crates could be integrated much easier.
- `AppRuleExt::replicate_group` and `GroupRegistration` trait to register and customize component groups. A group will be replicated only if all its components present on an entity.
- `ServerSet::StoreHierarchy` for systems that store hierarchy changes in `ParentSync`.
- `ReplicationRule` that describes how a component or a group of components will be serialized, deserialized and removed.

### Changed

- `AppReplicationExt::replicate_with` now accepts newly added `ReplicationFns` and the function is now `unsafe` (it was never "safe" before, caller must ensure that used `C` can be passed to the serialization function).
- `SerializeFn` now accepts regular `C` instead of `Ptr`.
- `DeserializeFn` now does only deserialization and returns `C`. Use the newly added marker-based API if you want to customize how component will be written. See docs for `AppMarkerExt` for details.
- Rename `AppReplicationExt` into `AppRuleExt`.
- `AppRuleExt::replicate_with` now no longer accepts `RemoveComponentFn`. Use the mentioned marker-based API to customize it instead.
- `AppRuleExt::replicate_with` now additionally accepts `in_place_as_deserialize`. You can use it to customize deserialization if a component is already present or just pass `command_fns::deserialize_in_place` to make it fallback to the passed `deserialize`.
- Writing to entities on client now done via `EntityMut` and `Commands` instead of `EntityWorldMut`. It was needed to support the mentioned in-place deserialization and will possibly allow batching insertions in the future (for details see https://github.com/bevyengine/bevy/issues/10154).
- Move `Replication` to `core` module.
- Move all functions-related logic from `ReplicationRules` into a new `ReplicationFns`.
- Rename `serialize_component` into `serialize` and move into `replication_fns` module.
- Rename `deserialize_component` into `deserialize` and move into `replication_fns` module.
- Rename `remove_component` into `remove` and move into `replication_fns` module.
- Move all functions-related logic from `ReplicationRules` into a new `ReplicationFns` and hide `ReplicationRules` from public API.
- Rename `serialize_component` into `default_serialize` and move into `serde_fns` module.
- Rename `deserialize_component` into `default_deserialize` and move into `serde_fns` module.
- Rename `deserialize_mapped_component` into `default_deserialize_mapped` and move into `serde_fns` module.
- Rename `remove_component` into `default_remove` and move into `command_fns` module.
- Move `despawn_recursive` into `replication_fns` module.

### Removed

- `dont_replicate` module. Use the newly added `AppReplicationExt::replicate_group` or newtypes.
- `dont_replicate` module. Use the newly added `AppRuleExt::replicate_group` or newtypes.

## [0.24.1] - 2024-03-07

Expand Down
154 changes: 120 additions & 34 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ pub mod replicon_client;

use std::io::Cursor;

use bevy::{ecs::entity::EntityHashMap, prelude::*};
use bevy::{
ecs::{entity::EntityHashMap, system::SystemState},
prelude::*,
};
use bincode::{DefaultOptions, Options};
use bytes::Bytes;
use varint_rs::VarintReader;

use crate::core::{
command_markers::CommandMarkers,
common_conditions::{client_connected, client_just_connected, client_just_disconnected},
replication_fns::ReplicationFns,
replicon_channels::{ReplicationChannel, RepliconChannels},
Expand Down Expand Up @@ -77,28 +81,35 @@ impl ClientPlugin {
/// Acknowledgments for received entity update messages are sent back to the server.
///
/// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages).
pub(super) fn receive_replication(world: &mut World) -> bincode::Result<()> {
pub(super) fn receive_replication(
world: &mut World,
state: &mut ReceiveState,
) -> bincode::Result<()> {
world.resource_scope(|world, mut client: Mut<RepliconClient>| {
world.resource_scope(|world, mut entity_map: Mut<ServerEntityMap>| {
world.resource_scope(|world, mut entity_ticks: Mut<ServerEntityTicks>| {
world.resource_scope(|world, mut buffered_updates: Mut<BufferedUpdates>| {
world.resource_scope(|world, replication_fns: Mut<ReplicationFns>| {
let mut stats = world.remove_resource::<ClientStats>();
apply_replication(
world,
&mut client,
&mut entity_map,
&mut entity_ticks,
&mut buffered_updates,
stats.as_mut(),
&replication_fns,
)?;

if let Some(stats) = stats {
world.insert_resource(stats);
}

Ok(())
world.resource_scope(|world, command_markers: Mut<CommandMarkers>| {
world.resource_scope(|world, replication_fns: Mut<ReplicationFns>| {
let mut stats = world.remove_resource::<ClientStats>();
apply_replication(
world,
state,
&mut client,
&mut entity_map,
&mut entity_ticks,
&mut buffered_updates,
stats.as_mut(),
&command_markers,
&replication_fns,
)?;

if let Some(stats) = stats {
world.insert_resource(stats);
}

Ok(())
})
})
})
})
Expand All @@ -124,20 +135,24 @@ impl ClientPlugin {
/// Sends acknowledgments for update messages back.
fn apply_replication(
world: &mut World,
state: &mut ReceiveState,
client: &mut RepliconClient,
entity_map: &mut ServerEntityMap,
entity_ticks: &mut ServerEntityTicks,
buffered_updates: &mut BufferedUpdates,
mut stats: Option<&mut ClientStats>,
command_markers: &CommandMarkers,
replication_fns: &ReplicationFns,
) -> Result<(), Box<bincode::ErrorKind>> {
while let Some(message) = client.receive(ReplicationChannel::Init) {
apply_init_message(
&message,
world,
state,
entity_map,
entity_ticks,
stats.as_deref_mut(),
command_markers,
replication_fns,
)?;
}
Expand All @@ -147,10 +162,12 @@ fn apply_replication(
let index = apply_update_message(
message,
world,
state,
entity_map,
entity_ticks,
buffered_updates,
stats.as_deref_mut(),
command_markers,
replication_fns,
replicon_tick,
)?;
Expand All @@ -168,9 +185,11 @@ fn apply_replication(
if let Err(e) = apply_update_components(
&mut Cursor::new(&*update.message),
world,
state,
entity_map,
entity_ticks,
stats.as_deref_mut(),
command_markers,
replication_fns,
update.message_tick,
) {
Expand All @@ -188,9 +207,11 @@ fn apply_replication(
fn apply_init_message(
message: &[u8],
world: &mut World,
state: &mut ReceiveState,
entity_map: &mut ServerEntityMap,
entity_ticks: &mut ServerEntityTicks,
mut stats: Option<&mut ClientStats>,
command_markers: &CommandMarkers,
replication_fns: &ReplicationFns,
) -> bincode::Result<()> {
let end_pos: u64 = message.len().try_into().unwrap();
Expand Down Expand Up @@ -226,10 +247,12 @@ fn apply_init_message(
apply_init_components(
&mut cursor,
world,
state,
entity_map,
entity_ticks,
stats.as_deref_mut(),
ComponentsKind::Removal,
command_markers,
replication_fns,
replicon_tick,
)?;
Expand All @@ -240,10 +263,12 @@ fn apply_init_message(
apply_init_components(
&mut cursor,
world,
state,
entity_map,
entity_ticks,
stats,
ComponentsKind::Insert,
command_markers,
replication_fns,
replicon_tick,
)?;
Expand All @@ -260,10 +285,12 @@ fn apply_init_message(
fn apply_update_message(
message: Bytes,
world: &mut World,
state: &mut ReceiveState,
entity_map: &mut ServerEntityMap,
entity_ticks: &mut ServerEntityTicks,
buffered_updates: &mut BufferedUpdates,
mut stats: Option<&mut ClientStats>,
command_markers: &CommandMarkers,
replication_fns: &ReplicationFns,
replicon_tick: RepliconTick,
) -> bincode::Result<u16> {
Expand All @@ -289,9 +316,11 @@ fn apply_update_message(
apply_update_components(
&mut cursor,
world,
state,
entity_map,
entity_ticks,
stats,
command_markers,
replication_fns,
message_tick,
)?;
Expand Down Expand Up @@ -330,37 +359,64 @@ fn apply_entity_mappings(
fn apply_init_components(
cursor: &mut Cursor<&[u8]>,
world: &mut World,
state: &mut ReceiveState,
entity_map: &mut ServerEntityMap,
entity_ticks: &mut ServerEntityTicks,
mut stats: Option<&mut ClientStats>,
components_kind: ComponentsKind,
command_markers: &CommandMarkers,
replication_fns: &ReplicationFns,
replicon_tick: RepliconTick,
) -> bincode::Result<()> {
let entities_len: u16 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..entities_len {
let entity = deserialize_entity(cursor)?;
let server_entity = deserialize_entity(cursor)?;
let data_size: u16 = bincode::deserialize_from(&mut *cursor)?;
let mut entity = entity_map.get_by_server_or_spawn(world, entity);
entity_ticks.insert(entity.id(), replicon_tick);

let client_entity =
entity_map.get_by_server_or_insert(server_entity, || world.spawn(Replication).id());
entity_ticks.insert(client_entity, replicon_tick);

let (mut entity_markers, mut commands, mut query) = state.get_mut(world);
let mut client_entity = query
.get_mut(client_entity)
.expect("replicated entities can be despawned only by server");
entity_markers.extend(command_markers.iter_contains(&client_entity));

let end_pos = cursor.position() + data_size as u64;
let mut components_len = 0u32;
while cursor.position() < end_pos {
let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?;
let fns = replication_fns.component_fns(fns_id);
let (command_fns, serde_fns) = replication_fns.get(fns_id);
match components_kind {
ComponentsKind::Insert => {
(fns.deserialize)(&mut entity, entity_map, cursor, replicon_tick)?
}
ComponentsKind::Removal => (fns.remove)(&mut entity, replicon_tick),
ComponentsKind::Insert => unsafe {
// SAFETY: `serde_fns` and `command_fns` were created for the same type.
command_fns.write(
serde_fns,
&entity_markers,
&mut commands,
&mut client_entity,
cursor,
entity_map,
replicon_tick,
)?
},
ComponentsKind::Removal => command_fns.remove(
&entity_markers,
commands.entity(client_entity.id()),
replicon_tick,
),
}
components_len += 1;
}

if let Some(stats) = &mut stats {
stats.entities_changed += 1;
stats.components_changed += components_len;
}

entity_markers.clear();
state.apply(world);
}

Ok(())
Expand Down Expand Up @@ -403,44 +459,68 @@ fn apply_despawns(
fn apply_update_components(
cursor: &mut Cursor<&[u8]>,
world: &mut World,
state: &mut ReceiveState,
entity_map: &mut ServerEntityMap,
entity_ticks: &mut ServerEntityTicks,
mut stats: Option<&mut ClientStats>,
command_markers: &CommandMarkers,
replication_fns: &ReplicationFns,
message_tick: RepliconTick,
) -> bincode::Result<()> {
let message_end = cursor.get_ref().len() as u64;
while cursor.position() < message_end {
let entity = deserialize_entity(cursor)?;
let server_entity = deserialize_entity(cursor)?;
let data_size: u16 = bincode::deserialize_from(&mut *cursor)?;
let Some(mut entity) = entity_map.get_by_server(world, entity) else {

let Some(client_entity) = entity_map.get_by_server(server_entity) else {
// Update could arrive after a despawn from init message.
debug!("ignoring update received for unknown server's {entity:?}");
debug!("ignoring update received for unknown server's {server_entity:?}");
cursor.set_position(cursor.position() + data_size as u64);
continue;
};
let entity_tick = entity_ticks
.get_mut(&entity.id())
.get_mut(&client_entity)
.expect("all entities from update should have assigned ticks");
if message_tick <= *entity_tick {
trace!("ignoring outdated update for client's {:?}", entity.id());
trace!("ignoring outdated update for client's {client_entity:?}");
cursor.set_position(cursor.position() + data_size as u64);
continue;
}
*entity_tick = message_tick;

let (mut entity_markers, mut commands, mut query) = state.get_mut(world);
let mut client_entity = query
.get_mut(client_entity)
.expect("replicated entities can be despawned only by server");
entity_markers.extend(command_markers.iter_contains(&client_entity));

let end_pos = cursor.position() + data_size as u64;
let mut components_count = 0u32;
while cursor.position() < end_pos {
let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?;
let fns = replication_fns.component_fns(fns_id);
(fns.deserialize)(&mut entity, entity_map, cursor, message_tick)?;
let (command_fns, serde_fns) = replication_fns.get(fns_id);
// SAFETY: `serde_fns` and `command_fns` were created for the same type.
unsafe {
command_fns.write(
serde_fns,
&entity_markers,
&mut commands,
&mut client_entity,
cursor,
entity_map,
message_tick,
)?;
}
components_count += 1;
}

if let Some(stats) = &mut stats {
stats.entities_changed += 1;
stats.components_changed += components_count;
}

entity_markers.clear();
state.apply(world);
}

Ok(())
Expand All @@ -464,6 +544,12 @@ fn deserialize_entity(cursor: &mut Cursor<&[u8]>) -> bincode::Result<Entity> {
Ok(Entity::from_bits(bits))
}

type ReceiveState<'w, 's> = SystemState<(
Local<'s, Vec<bool>>,
Commands<'w, 's>,
Query<'w, 's, EntityMut<'w>>,
)>;

/// Type of components replication.
///
/// Parameter for [`apply_components`].
Expand Down
Loading

0 comments on commit de53d66

Please sign in to comment.