-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Marker-based API #227
Marker-based API #227
Changes from 51 commits
f6a640f
a002545
dc016c8
61217dc
fbbcb8e
01aa915
d63c825
61c3150
b6c0912
430591e
57ff75b
7585c52
b553307
e70ef65
f83986e
d3d2e44
2c2fbe3
ac93b3d
7c2d740
c8a5661
e30e3fd
070d877
f198beb
c61634a
fd46342
4fd415b
85ab9e7
0f1fe63
8d23bb8
bb6471a
c3836dd
8df39cd
06145a6
e371006
14f2372
9a3d094
531da74
b1bb317
53fa8d2
abae767
5a1f4e7
b531891
48f2a37
abc1339
66e54ad
e6abdb1
55cadc2
578b944
e5ea886
9a2a9c3
fdb524b
1455ab4
a877b83
121f2db
4efa05c
8236587
753e3e1
870384d
7dde4d3
5117c38
ffe51df
c5a1597
341bba0
a56912e
6d9e725
28da5eb
cf99bf2
44f8e97
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,12 +4,16 @@ pub mod replicon_client; | |||||
|
||||||
use std::io::Cursor; | ||||||
|
||||||
use bevy::{ecs::entity::EntityHashMap, prelude::*}; | ||||||
use bevy::{ | ||||||
ecs::{entity::EntityHashMap, system::SystemState}, | ||||||
prelude::*, | ||||||
}; | ||||||
use bincode::{DefaultOptions, Options}; | ||||||
use bytes::Bytes; | ||||||
use varint_rs::VarintReader; | ||||||
|
||||||
use crate::core::{ | ||||||
command_markers::CommandMarkers, | ||||||
common_conditions::{client_connected, client_just_connected, client_just_disconnected}, | ||||||
replication_fns::ReplicationFns, | ||||||
replicon_channels::{ReplicationChannel, RepliconChannels}, | ||||||
|
@@ -77,28 +81,35 @@ impl ClientPlugin { | |||||
/// Acknowledgments for received entity update messages are sent back to the server. | ||||||
/// | ||||||
/// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages). | ||||||
pub(super) fn receive_replication(world: &mut World) -> bincode::Result<()> { | ||||||
pub(super) fn receive_replication( | ||||||
world: &mut World, | ||||||
state: &mut ReceiveState, | ||||||
) -> bincode::Result<()> { | ||||||
world.resource_scope(|world, mut client: Mut<RepliconClient>| { | ||||||
world.resource_scope(|world, mut entity_map: Mut<ServerEntityMap>| { | ||||||
world.resource_scope(|world, mut entity_ticks: Mut<ServerEntityTicks>| { | ||||||
world.resource_scope(|world, mut buffered_updates: Mut<BufferedUpdates>| { | ||||||
world.resource_scope(|world, replication_fns: Mut<ReplicationFns>| { | ||||||
let mut stats = world.remove_resource::<ClientStats>(); | ||||||
apply_replication( | ||||||
world, | ||||||
&mut client, | ||||||
&mut entity_map, | ||||||
&mut entity_ticks, | ||||||
&mut buffered_updates, | ||||||
stats.as_mut(), | ||||||
&replication_fns, | ||||||
)?; | ||||||
|
||||||
if let Some(stats) = stats { | ||||||
world.insert_resource(stats); | ||||||
} | ||||||
|
||||||
Ok(()) | ||||||
world.resource_scope(|world, command_markers: Mut<CommandMarkers>| { | ||||||
world.resource_scope(|world, replication_fns: Mut<ReplicationFns>| { | ||||||
let mut stats = world.remove_resource::<ClientStats>(); | ||||||
apply_replication( | ||||||
world, | ||||||
state, | ||||||
&mut client, | ||||||
&mut entity_map, | ||||||
&mut entity_ticks, | ||||||
&mut buffered_updates, | ||||||
stats.as_mut(), | ||||||
&command_markers, | ||||||
&replication_fns, | ||||||
)?; | ||||||
|
||||||
if let Some(stats) = stats { | ||||||
world.insert_resource(stats); | ||||||
} | ||||||
|
||||||
Ok(()) | ||||||
}) | ||||||
}) | ||||||
}) | ||||||
}) | ||||||
|
@@ -124,20 +135,24 @@ impl ClientPlugin { | |||||
/// Sends acknowledgments for update messages back. | ||||||
fn apply_replication( | ||||||
world: &mut World, | ||||||
state: &mut ReceiveState, | ||||||
client: &mut RepliconClient, | ||||||
entity_map: &mut ServerEntityMap, | ||||||
entity_ticks: &mut ServerEntityTicks, | ||||||
buffered_updates: &mut BufferedUpdates, | ||||||
mut stats: Option<&mut ClientStats>, | ||||||
command_markers: &CommandMarkers, | ||||||
replication_fns: &ReplicationFns, | ||||||
) -> Result<(), Box<bincode::ErrorKind>> { | ||||||
while let Some(message) = client.receive(ReplicationChannel::Init) { | ||||||
apply_init_message( | ||||||
&message, | ||||||
world, | ||||||
state, | ||||||
entity_map, | ||||||
entity_ticks, | ||||||
stats.as_deref_mut(), | ||||||
command_markers, | ||||||
replication_fns, | ||||||
)?; | ||||||
} | ||||||
|
@@ -147,10 +162,12 @@ fn apply_replication( | |||||
let index = apply_update_message( | ||||||
message, | ||||||
world, | ||||||
state, | ||||||
entity_map, | ||||||
entity_ticks, | ||||||
buffered_updates, | ||||||
stats.as_deref_mut(), | ||||||
command_markers, | ||||||
replication_fns, | ||||||
replicon_tick, | ||||||
)?; | ||||||
|
@@ -168,9 +185,11 @@ fn apply_replication( | |||||
if let Err(e) = apply_update_components( | ||||||
&mut Cursor::new(&*update.message), | ||||||
world, | ||||||
state, | ||||||
entity_map, | ||||||
entity_ticks, | ||||||
stats.as_deref_mut(), | ||||||
command_markers, | ||||||
replication_fns, | ||||||
update.message_tick, | ||||||
) { | ||||||
|
@@ -188,9 +207,11 @@ fn apply_replication( | |||||
fn apply_init_message( | ||||||
message: &[u8], | ||||||
world: &mut World, | ||||||
state: &mut ReceiveState, | ||||||
entity_map: &mut ServerEntityMap, | ||||||
entity_ticks: &mut ServerEntityTicks, | ||||||
mut stats: Option<&mut ClientStats>, | ||||||
command_markers: &CommandMarkers, | ||||||
replication_fns: &ReplicationFns, | ||||||
) -> bincode::Result<()> { | ||||||
let end_pos: u64 = message.len().try_into().unwrap(); | ||||||
|
@@ -226,10 +247,12 @@ fn apply_init_message( | |||||
apply_init_components( | ||||||
&mut cursor, | ||||||
world, | ||||||
state, | ||||||
entity_map, | ||||||
entity_ticks, | ||||||
stats.as_deref_mut(), | ||||||
ComponentsKind::Removal, | ||||||
command_markers, | ||||||
replication_fns, | ||||||
replicon_tick, | ||||||
)?; | ||||||
|
@@ -240,10 +263,12 @@ fn apply_init_message( | |||||
apply_init_components( | ||||||
&mut cursor, | ||||||
world, | ||||||
state, | ||||||
entity_map, | ||||||
entity_ticks, | ||||||
stats, | ||||||
ComponentsKind::Insert, | ||||||
command_markers, | ||||||
replication_fns, | ||||||
replicon_tick, | ||||||
)?; | ||||||
|
@@ -260,10 +285,12 @@ fn apply_init_message( | |||||
fn apply_update_message( | ||||||
message: Bytes, | ||||||
world: &mut World, | ||||||
state: &mut ReceiveState, | ||||||
entity_map: &mut ServerEntityMap, | ||||||
entity_ticks: &mut ServerEntityTicks, | ||||||
buffered_updates: &mut BufferedUpdates, | ||||||
mut stats: Option<&mut ClientStats>, | ||||||
command_markers: &CommandMarkers, | ||||||
replication_fns: &ReplicationFns, | ||||||
replicon_tick: RepliconTick, | ||||||
) -> bincode::Result<u16> { | ||||||
|
@@ -289,9 +316,11 @@ fn apply_update_message( | |||||
apply_update_components( | ||||||
&mut cursor, | ||||||
world, | ||||||
state, | ||||||
entity_map, | ||||||
entity_ticks, | ||||||
stats, | ||||||
command_markers, | ||||||
replication_fns, | ||||||
message_tick, | ||||||
)?; | ||||||
|
@@ -330,37 +359,62 @@ fn apply_entity_mappings( | |||||
fn apply_init_components( | ||||||
cursor: &mut Cursor<&[u8]>, | ||||||
world: &mut World, | ||||||
state: &mut ReceiveState, | ||||||
entity_map: &mut ServerEntityMap, | ||||||
entity_ticks: &mut ServerEntityTicks, | ||||||
mut stats: Option<&mut ClientStats>, | ||||||
components_kind: ComponentsKind, | ||||||
command_markers: &CommandMarkers, | ||||||
replication_fns: &ReplicationFns, | ||||||
replicon_tick: RepliconTick, | ||||||
) -> bincode::Result<()> { | ||||||
let entities_len: u16 = bincode::deserialize_from(&mut *cursor)?; | ||||||
for _ in 0..entities_len { | ||||||
let entity = deserialize_entity(cursor)?; | ||||||
let server_entity = deserialize_entity(cursor)?; | ||||||
let data_size: u16 = bincode::deserialize_from(&mut *cursor)?; | ||||||
let mut entity = entity_map.get_by_server_or_spawn(world, entity); | ||||||
entity_ticks.insert(entity.id(), replicon_tick); | ||||||
|
||||||
let client_entity = | ||||||
entity_map.get_by_server_or_insert(server_entity, || world.spawn(Replication).id()); | ||||||
entity_ticks.insert(client_entity, replicon_tick); | ||||||
|
||||||
let (mut entity_markers, mut commands, mut query) = state.get_mut(world); | ||||||
let mut server_entity = query.get_mut(client_entity).unwrap(); | ||||||
entity_markers.extend(command_markers.iter_contains(&server_entity)); | ||||||
|
||||||
let end_pos = cursor.position() + data_size as u64; | ||||||
let mut components_len = 0u32; | ||||||
while cursor.position() < end_pos { | ||||||
let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; | ||||||
let fns = replication_fns.component_fns(fns_id); | ||||||
let (serde_fns, command_fns) = replication_fns.get(fns_id); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Nit: this order seems more consistent with how they are used. |
||||||
match components_kind { | ||||||
ComponentsKind::Insert => { | ||||||
(fns.deserialize)(&mut entity, entity_map, cursor, replicon_tick)? | ||||||
} | ||||||
ComponentsKind::Removal => (fns.remove)(&mut entity, replicon_tick), | ||||||
ComponentsKind::Insert => unsafe { | ||||||
// SAFETY: `serde_fns` and `command_fns` was created for the same type. | ||||||
Shatur marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
command_fns.write( | ||||||
serde_fns, | ||||||
&entity_markers, | ||||||
&mut commands, | ||||||
&mut server_entity, | ||||||
cursor, | ||||||
entity_map, | ||||||
replicon_tick, | ||||||
)? | ||||||
}, | ||||||
ComponentsKind::Removal => command_fns.remove( | ||||||
&entity_markers, | ||||||
commands.entity(server_entity.id()), | ||||||
replicon_tick, | ||||||
), | ||||||
} | ||||||
components_len += 1; | ||||||
} | ||||||
|
||||||
if let Some(stats) = &mut stats { | ||||||
stats.entities_changed += 1; | ||||||
stats.components_changed += components_len; | ||||||
} | ||||||
|
||||||
entity_markers.clear(); | ||||||
state.apply(world); | ||||||
} | ||||||
|
||||||
Ok(()) | ||||||
|
@@ -403,44 +457,66 @@ fn apply_despawns( | |||||
fn apply_update_components( | ||||||
cursor: &mut Cursor<&[u8]>, | ||||||
world: &mut World, | ||||||
state: &mut ReceiveState, | ||||||
entity_map: &mut ServerEntityMap, | ||||||
entity_ticks: &mut ServerEntityTicks, | ||||||
mut stats: Option<&mut ClientStats>, | ||||||
command_markers: &CommandMarkers, | ||||||
replication_fns: &ReplicationFns, | ||||||
message_tick: RepliconTick, | ||||||
) -> bincode::Result<()> { | ||||||
let message_end = cursor.get_ref().len() as u64; | ||||||
while cursor.position() < message_end { | ||||||
let entity = deserialize_entity(cursor)?; | ||||||
let server_entity = deserialize_entity(cursor)?; | ||||||
let data_size: u16 = bincode::deserialize_from(&mut *cursor)?; | ||||||
let Some(mut entity) = entity_map.get_by_server(world, entity) else { | ||||||
|
||||||
let Some(client_entity) = entity_map.get_by_server(server_entity) else { | ||||||
// Update could arrive after a despawn from init message. | ||||||
debug!("ignoring update received for unknown server's {entity:?}"); | ||||||
debug!("ignoring update received for unknown server's {server_entity:?}"); | ||||||
cursor.set_position(cursor.position() + data_size as u64); | ||||||
continue; | ||||||
}; | ||||||
let entity_tick = entity_ticks | ||||||
.get_mut(&entity.id()) | ||||||
.get_mut(&client_entity) | ||||||
.expect("all entities from update should have assigned ticks"); | ||||||
if message_tick <= *entity_tick { | ||||||
trace!("ignoring outdated update for client's {:?}", entity.id()); | ||||||
trace!("ignoring outdated update for client's {client_entity:?}"); | ||||||
cursor.set_position(cursor.position() + data_size as u64); | ||||||
continue; | ||||||
} | ||||||
*entity_tick = message_tick; | ||||||
|
||||||
let (mut entity_markers, mut commands, mut query) = state.get_mut(world); | ||||||
let mut client_entity = query.get_mut(client_entity).unwrap(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a race condition where the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have test for combining insertions or updates with despawns, they pass. But yes, if not server, but client itself despawn an entity in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure that would work |
||||||
entity_markers.extend(command_markers.iter_contains(&client_entity)); | ||||||
|
||||||
let end_pos = cursor.position() + data_size as u64; | ||||||
let mut components_count = 0u32; | ||||||
while cursor.position() < end_pos { | ||||||
let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; | ||||||
let fns = replication_fns.component_fns(fns_id); | ||||||
(fns.deserialize)(&mut entity, entity_map, cursor, message_tick)?; | ||||||
let (serde_fns, command_fns) = replication_fns.get(fns_id); | ||||||
// SAFETY: `serde_fns` and `command_fns` was created for the same type. | ||||||
Shatur marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
unsafe { | ||||||
command_fns.write( | ||||||
serde_fns, | ||||||
&entity_markers, | ||||||
&mut commands, | ||||||
&mut client_entity, | ||||||
cursor, | ||||||
entity_map, | ||||||
message_tick, | ||||||
)?; | ||||||
} | ||||||
components_count += 1; | ||||||
} | ||||||
|
||||||
if let Some(stats) = &mut stats { | ||||||
stats.entities_changed += 1; | ||||||
stats.components_changed += components_count; | ||||||
} | ||||||
|
||||||
entity_markers.clear(); | ||||||
state.apply(world); | ||||||
} | ||||||
|
||||||
Ok(()) | ||||||
|
@@ -464,6 +540,12 @@ fn deserialize_entity(cursor: &mut Cursor<&[u8]>) -> bincode::Result<Entity> { | |||||
Ok(Entity::from_bits(bits)) | ||||||
} | ||||||
|
||||||
type ReceiveState<'w, 's> = SystemState<( | ||||||
Local<'s, Vec<bool>>, | ||||||
Commands<'w, 's>, | ||||||
Query<'w, 's, EntityMut<'w>>, | ||||||
)>; | ||||||
|
||||||
/// Type of components replication. | ||||||
/// | ||||||
/// Parameter for [`apply_components`]. | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.