Skip to content
This repository has been archived by the owner on Nov 10, 2024. It is now read-only.

Commit

Permalink
Need to sort out the alive_ranges stuff..
Browse files Browse the repository at this point in the history
The plan, as yet unimplemented:

* Inserting into CH at a frame should imply alive at that frame.
* Detecting component removal outside of rollback should mark death
  frame in the SS. Death frames in SS are an array of frame,death i
think, since server can add component back any time
* Re-apply deaths at frames from SS during rollback, since the SS death
  frames are authoritative - ie replicon removed it.
* alive_at_frame is then check ss death mask && ch.has-value-at-frame
  • Loading branch information
RJ committed Oct 20, 2023
1 parent 95f9fdd commit bb0115d
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 83 deletions.
47 changes: 34 additions & 13 deletions src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,39 @@ use bevy::prelude::*;
#[derive(Component)]
pub struct NoRollback;

/// Added to every entity, for tracking which frame they were last synced to a snapshot
/// Deduct `last_snapshot_frame` from the current frame to determine how many frames this
/// entity is predicted ahead for.
#[derive(Component)]
/// Added to every entity for metrics
#[derive(Component, Debug)]
pub struct TimewarpStatus {
/// Deduct `last_snapshot_frame` from the current frame to determine how many frames this
/// entity is predicted ahead for.
last_snapshot_frame: FrameNumber,
/// Incremented when an update to this entity caused a rollback to be requested
rollback_triggers: u32,
}

impl TimewarpStatus {
pub fn new(last_snapshot_frame: FrameNumber) -> Self {
Self {
last_snapshot_frame,
rollback_triggers: 0,
}
}
/// returns the frame of the most recent snapshot,
/// telling you when any component of this entity was most recently updated.
pub fn last_snap_frame(&self) -> FrameNumber {
self.last_snapshot_frame
}
/// how many times did this entity cause a rollback to be requested
pub fn rollback_triggers(&self) -> u32 {
self.rollback_triggers
}

pub fn set_snapped_at(&mut self, frame: FrameNumber) {
self.last_snapshot_frame = self.last_snapshot_frame.max(frame);
}
pub fn increment_rollback_triggers(&mut self) {
self.rollback_triggers += 1;
}
}

/// Used when you want to insert a component T, but for an older frame.
Expand Down Expand Up @@ -57,11 +68,11 @@ impl<T: TimewarpComponent> InsertComponentAtFrame<T> {
/// I use this for blueprints. The blueprint assembly function runs during rollback and
/// adds the various timewarp-registered (and other) components to the entity during rollback.
#[derive(Component, Debug)]
pub struct AssembleBlueprintAtFrame<T: Component> {
pub struct AssembleBlueprintAtFrame<T: Component + std::fmt::Debug + Clone> {
pub component: T,
pub frame: FrameNumber,
}
impl<T: Component> AssembleBlueprintAtFrame<T> {
impl<T: Component + std::fmt::Debug + Clone> AssembleBlueprintAtFrame<T> {
pub fn new(frame: FrameNumber, component: T) -> Self {
Self { component, frame }
}
Expand Down Expand Up @@ -162,7 +173,12 @@ impl<T: TimewarpComponent> ComponentHistory<T> {
entity: &Entity,
) -> Result<(), TimewarpError> {
trace!("CH.Insert {entity:?} {frame} = {val:?}");
self.values.insert(frame, val)
// TODO this is inefficient atm
self.values.insert(frame, val)?;
if !self.alive_at_frame(frame) {
self.report_birth_at_frame(frame);
}
Ok(())
}

/// removes values buffered for this frame, and greater frames.
Expand Down Expand Up @@ -190,6 +206,9 @@ impl<T: TimewarpComponent> ComponentHistory<T> {
self.values.get(frame).is_some(),
"No stored component value when reporting birth @ {frame}"
);
if self.alive_ranges.last().unwrap().1 == Some(frame) {
return;
}
self.alive_ranges.push((frame, None));
}
pub fn report_death_at_frame(&mut self, frame: FrameNumber) {
Expand All @@ -202,16 +221,18 @@ impl<T: TimewarpComponent> ComponentHistory<T> {
return;
}

trace!(
"component death @ {frame} {:?} {:?}",
std::any::type_name::<T>(),
self.alive_ranges
);

assert!(
self.alive_at_frame(frame),
"Can't report death of component not alive"
);
if self.alive_ranges.last().unwrap().1 == Some(frame) {
return;
}
self.alive_ranges.last_mut().unwrap().1 = Some(frame);
trace!(
"component death @ {frame} {:?} --> {:?}",
std::any::type_name::<T>(),
self.alive_ranges
);
}
}
3 changes: 3 additions & 0 deletions src/systems/postfix_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ pub(crate) fn record_component_birth<T: TimewarpComponent>(
game_clock: Res<GameClock>,
rb: Option<Res<Rollback>>,
) {
return;
// no. implied by inserting values to CH!

// during rollback, components are removed and readded.
// but we don't want to log the same as outside of rollback, we want to ignore.
// however this system still runs, so that the Added<T> filters update their markers
Expand Down
2 changes: 1 addition & 1 deletion src/systems/prefix_blueprints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use bevy::prelude::*;

/// Blueprint components stay wrapped up until their target frame, then we unwrap them
/// so the assembly systems can decorate them with various other components at that frame.
pub(crate) fn unwrap_blueprints_at_target_frame<T: TimewarpComponent>(
pub(crate) fn unwrap_blueprints_at_target_frame<T: Component + std::fmt::Debug + Clone>(
q: Query<(Entity, &AssembleBlueprintAtFrame<T>)>,
mut commands: Commands,
game_clock: Res<GameClock>,
Expand Down
196 changes: 151 additions & 45 deletions src/systems/prefix_not_in_rollback.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::cmp::Ordering;

/*
NOTE: Timewarp Prefix Systems run at the top of FixedUpdate:
* RIGHT BEFORE THE GameClock IS INCREMENTED.
Expand All @@ -8,16 +10,6 @@
use crate::prelude::*;
use bevy::prelude::*;

/// Don't insert ICAFs if the SS exists, use the SS.
/// can probably support this later, but keeps things simpler for now.
pub(crate) fn detect_misuse_of_icaf<T: TimewarpComponent>(
q: Query<(Entity, &ServerSnapshot<T>, &InsertComponentAtFrame<T>)>,
) {
for (e, _ss, icaf) in q.iter() {
panic!("ICAF and SS exist on {e:?} {icaf:?}");
}
}

/// If a new snapshot was added to SS, we may need to initiate a rollback
pub(crate) fn apply_snapshots_and_maybe_rollback<T: TimewarpComponent>(
mut q: Query<
Expand Down Expand Up @@ -73,23 +65,23 @@ pub(crate) fn apply_snapshots_and_maybe_rollback<T: TimewarpComponent>(
match comp_hist.insert(snap_frame, comp_from_snapshot.clone(), &entity) {
Ok(()) => (),
Err(err) => {
rb_stats.range_faults += 1;
// probably FrameTooOld.
error!(
panic!(
"{err:?} {entity:?} apply_snapshots_and_maybe_rollback({}) - skipping",
comp_hist.type_name()
);
rb_stats.range_faults += 1;
// we can't rollback to this
// this is bad.
continue;
// continue;
}
}

if !comp_hist.alive_at_frame(snap_frame) {
info!("Setting liveness for {snap_frame} {entity:?} {comp_from_snapshot:?} ");
comp_hist.report_birth_at_frame(snap_frame);
assert!(comp_hist.at_frame(snap_frame).is_some());
}
// if !comp_hist.alive_at_frame(snap_frame) {
// info!("Setting liveness for {snap_frame} {entity:?} {comp_from_snapshot:?} ");
// comp_hist.report_birth_at_frame(snap_frame);
// assert!(comp_hist.at_frame(snap_frame).is_some());
// }

if snap_frame < **game_clock {
debug!(
Expand All @@ -102,27 +94,39 @@ pub(crate) fn apply_snapshots_and_maybe_rollback<T: TimewarpComponent>(
rb_ev.send(RollbackRequest::resimulate_this_frame_onwards(
snap_frame + 1,
));
tw_status.increment_rollback_triggers();
}
}
}

/// Move ICAF data to the SS.
/// Move ICAF data to the SS and add SS, because it's missing.
///
/// if an ICAF was inserted, we may need to rollback.
///
pub(crate) fn unpack_icafs_and_maybe_rollback<
pub(crate) fn unpack_icafs_adding_tw_components<
T: TimewarpComponent,
const CORRECTION_LOGGING: bool,
>(
q: Query<(Entity, &InsertComponentAtFrame<T>), Added<InsertComponentAtFrame<T>>>,
mut q: Query<
(
Entity,
&InsertComponentAtFrame<T>,
Option<&mut TimewarpStatus>,
),
(
Added<InsertComponentAtFrame<T>>,
Without<NoRollback>,
Without<ServerSnapshot<T>>,
Without<ComponentHistory<T>>,
),
>,
mut commands: Commands,
timewarp_config: Res<TimewarpConfig>,
game_clock: Res<GameClock>,
mut rb_ev: ResMut<Events<RollbackRequest>>,
) {
for (e, icaf) in q.iter() {
for (e, icaf, opt_twstatus) in q.iter_mut() {
// insert the timewarp components
let tw_status = TimewarpStatus::new(icaf.frame);
let mut ch = ComponentHistory::<T>::with_capacity(
timewarp_config.rollback_window as usize,
icaf.frame,
Expand All @@ -137,37 +141,132 @@ pub(crate) fn unpack_icafs_and_maybe_rollback<
ServerSnapshot::<T>::with_capacity(timewarp_config.rollback_window as usize * 60);
ss.insert(icaf.frame, icaf.component.clone()).unwrap();
// (this will be applied in the ApplyComponents set next)
commands
.entity(e)
.insert((tw_status, ch, ss))
.remove::<InsertComponentAtFrame<T>>();

// if frames match, we want it inserted this frame but not rolled back
if icaf.frame == **game_clock {
// info!("Inserting latecomer in trigger icafs: {e:?} {icaf:?}");
commands.entity(e).insert(icaf.component.clone());
continue;

match icaf.frame.cmp(&game_clock.frame()) {
// if frames match, we want it inserted this frame but not rolled back
// since it has arrived just in time.
Ordering::Equal => {
if let Some(mut tw_status) = opt_twstatus {
tw_status.set_snapped_at(icaf.frame);
} else {
let mut tw_status = TimewarpStatus::new(icaf.frame);
tw_status.set_snapped_at(icaf.frame);
commands.entity(e).insert(tw_status);
}
commands
.entity(e)
.insert((ch, ss, icaf.component.clone()))
.remove::<InsertComponentAtFrame<T>>();
}
// needs insertion in the past, so request a rollback.
Ordering::Less => {
debug!(
"{e:?} Requesting rolllback when unpacking: {icaf:?} rb to {}",
icaf.frame + 1
);
if let Some(mut tw_status) = opt_twstatus {
tw_status.increment_rollback_triggers();
tw_status.set_snapped_at(icaf.frame);
} else {
let mut tw_status = TimewarpStatus::new(icaf.frame);
tw_status.increment_rollback_triggers();
tw_status.set_snapped_at(icaf.frame);
commands.entity(e).insert(tw_status);
}
commands
.entity(e)
.insert((ch, ss))
.remove::<InsertComponentAtFrame<T>>();
rb_ev.send(RollbackRequest::resimulate_this_frame_onwards(
icaf.frame + 1,
));
}
Ordering::Greater => {
// clients are supposed to be ahead, so we don't really expect to get updates for
// future frames. We'll store it but can't rollback to future.
commands
.entity(e)
.insert((ch, ss))
.remove::<InsertComponentAtFrame<T>>();
}
}
}
}

if icaf.frame < **game_clock {
// trigger a rollback using the frame we just added authoritative values for
debug!(
"{e:?} trigger_rollback_when_icaf_added {icaf:?} requesting rb to {}",
icaf.frame + 1
);
rb_ev.send(RollbackRequest::resimulate_this_frame_onwards(
icaf.frame + 1,
));
/// Move ICAF data to the existing SS
///
/// if an ICAF was inserted, we may need to rollback.
///
pub(crate) fn unpack_icafs_into_tw_components<
T: TimewarpComponent,
const CORRECTION_LOGGING: bool,
>(
mut q: Query<
(
Entity,
&InsertComponentAtFrame<T>,
&mut ServerSnapshot<T>,
&mut ComponentHistory<T>,
&mut TimewarpStatus,
),
(Added<InsertComponentAtFrame<T>>, Without<NoRollback>),
>,
mut commands: Commands,
game_clock: Res<GameClock>,
mut rb_ev: ResMut<Events<RollbackRequest>>,
) {
for (e, icaf, mut ss, mut ch, mut tw_status) in q.iter_mut() {
ch.insert(icaf.frame, icaf.component.clone(), &e)
.expect("Couldn't insert ICAF to CH");
ss.insert(icaf.frame, icaf.component.clone())
.expect("Couldn't insert ICAF to SS");

info!("Alive ranges for {icaf:?} = {:?}", ch.alive_ranges);

match icaf.frame.cmp(&game_clock.frame()) {
// if frames match, we want it inserted this frame but not rolled back
// since it has arrived just in time.
Ordering::Equal => {
commands
.entity(e)
.insert(icaf.component.clone())
.remove::<InsertComponentAtFrame<T>>();
}
// needs insertion in the past, so request a rollback.
Ordering::Less => {
debug!(
"{e:?} Requesting rolllback when unpacking: {icaf:?} rb to {}",
icaf.frame + 1
);
tw_status.increment_rollback_triggers();
commands.entity(e).remove::<InsertComponentAtFrame<T>>();
rb_ev.send(RollbackRequest::resimulate_this_frame_onwards(
icaf.frame + 1,
));
}
Ordering::Greater => {
// clients are supposed to be ahead, so we don't really expect to get updates for
// future frames. We'll store it but can't rollback to future.
commands.entity(e).remove::<InsertComponentAtFrame<T>>();
}
}
}
}

pub(crate) fn request_rollback_for_blueprints<T: TimewarpComponent>(
q: Query<(Entity, &AssembleBlueprintAtFrame<T>), Added<AssembleBlueprintAtFrame<T>>>,
pub(crate) fn request_rollback_for_blueprints<T: Component + std::fmt::Debug + Clone>(
mut q: Query<
(
Entity,
&AssembleBlueprintAtFrame<T>,
Option<&mut TimewarpStatus>,
),
Added<AssembleBlueprintAtFrame<T>>,
>,
game_clock: Res<GameClock>,
mut rb_ev: ResMut<Events<RollbackRequest>>,
mut commands: Commands,
) {
for (entity, abaf) in q.iter() {
for (entity, abaf, opt_twstatus) in q.iter_mut() {
let snap_frame = abaf.frame;
// if frames == match, we want it inserted this frame but not rolled back.
// don't do this here, the blueprint unpacking fn does this even during rollback.
Expand All @@ -176,6 +275,13 @@ pub(crate) fn request_rollback_for_blueprints<T: TimewarpComponent>(
debug!(
"{game_clock:?} {entity:?} Requesting rollback for blueprint with snap_frame:{snap_frame} - {abaf:?}"
);
if let Some(mut tws) = opt_twstatus {
tws.increment_rollback_triggers();
} else {
let mut tws = TimewarpStatus::new(snap_frame);
tws.increment_rollback_triggers();
commands.entity(entity).insert(tws);
}
rb_ev.send(RollbackRequest::resimulate_this_frame_onwards(
snap_frame + 1,
));
Expand Down
Loading

0 comments on commit bb0115d

Please sign in to comment.