Skip to content

Commit

Permalink
sync: add watch::Sender::send_replace (tokio-rs#3962)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kai Jewson authored and suikammd committed Oct 7, 2021
1 parent 4a9cf71 commit b49a55b
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::sync::notify::Notify;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::mem;
use std::ops;

/// Receives values from the associated [`Sender`](struct@Sender).
Expand Down Expand Up @@ -427,15 +428,34 @@ impl<T> Sender<T> {
/// This method fails if the channel has been closed, which happens when
/// every receiver has been dropped.
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
self.send_replace(value)?;
Ok(())
}

/// Sends a new value via the channel, notifying all receivers and returning
/// the previous value in the channel.
///
/// This can be useful for reusing the buffers inside a watched value.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// let (tx, _rx) = watch::channel(1);
/// assert_eq!(tx.send_replace(2).unwrap(), 1);
/// assert_eq!(tx.send_replace(3).unwrap(), 2);
/// ```
pub fn send_replace(&self, value: T) -> Result<T, error::SendError<T>> {
// This is pretty much only useful as a hint anyway, so synchronization isn't critical.
if 0 == self.receiver_count() {
return Err(error::SendError(value));
}

{
let old = {
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
*lock = value;
let old = mem::replace(&mut *lock, value);

self.shared.state.increment_version();

Expand All @@ -445,12 +465,14 @@ impl<T> Sender<T> {
// that receivers are able to figure out the version number of the
// value they are currently looking at.
drop(lock);
}

old
};

// Notify all watchers
self.shared.notify_rx.notify_waiters();

Ok(())
Ok(old)
}

/// Returns a reference to the most recently sent value
Expand Down

0 comments on commit b49a55b

Please sign in to comment.