Skip to content

Commit

Permalink
Add traits ParallelDrainRange and ParallelDrainFull
Browse files Browse the repository at this point in the history
These define methods of the same name, so only one is expected to be
implemented for any given collection type. Collections that support
ranges can always call `par_drain(..)` for a full drain.

```rust
pub trait ParallelDrainRange<Idx = usize> {
    type Iter: ParallelIterator<Item = Self::Item>;
    type Item: Send;
    fn par_drain<R: RangeBounds<Idx>>(self, range: R) -> Self::Iter;
}

pub trait ParallelDrainFull {
    type Iter: ParallelIterator<Item = Self::Item>;
    type Item: Send;
    fn par_drain(self) -> Self::Iter;
}
```
  • Loading branch information
cuviper committed Aug 16, 2020
1 parent 09428ec commit 3dcc07e
Show file tree
Hide file tree
Showing 11 changed files with 574 additions and 33 deletions.
62 changes: 62 additions & 0 deletions src/collections/binary_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,65 @@ delegate_indexed_iterator! {
}

// `BinaryHeap` doesn't have a mutable `Iterator`

/// Draining parallel iterator that moves out of a binary heap,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Ord + Send> {
heap: &'a mut BinaryHeap<T>,
}

impl<'a, T: Ord + Send> ParallelDrainFull for &'a mut BinaryHeap<T> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain(self) -> Self::Iter {
Drain { heap: self }
}
}

impl<'a, T: Ord + Send> ParallelIterator for Drain<'a, T> {
type Item = T;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
bridge(self, consumer)
}

fn opt_len(&self) -> Option<usize> {
Some(self.len())
}
}

impl<'a, T: Ord + Send> IndexedParallelIterator for Drain<'a, T> {
fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>,
{
bridge(self, consumer)
}

fn len(&self) -> usize {
self.heap.len()
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
super::DrainGuard::new(self.heap)
.par_drain(..)
.with_producer(callback)
}
}

impl<'a, T: Ord + Send> Drop for Drain<'a, T> {
fn drop(&mut self) {
if !self.heap.is_empty() {
// We must not have produced, so just call a normal drain to remove the items.
self.heap.drain();
}
}
}
29 changes: 29 additions & 0 deletions src/collections/hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;

use crate::iter::plumbing::*;
use crate::iter::*;
Expand Down Expand Up @@ -65,3 +66,31 @@ delegate_iterator! {
IterMut<'a, K, V> => (&'a K, &'a mut V),
impl<'a, K: Hash + Eq + Sync + 'a, V: Send + 'a>
}

/// Draining parallel iterator that moves out of a hash map,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, K: Hash + Eq + Send, V: Send> {
inner: vec::IntoIter<(K, V)>,
marker: PhantomData<&'a mut HashMap<K, V>>,
}

impl<'a, K: Hash + Eq + Send, V: Send, S: BuildHasher> ParallelDrainFull
for &'a mut HashMap<K, V, S>
{
type Iter = Drain<'a, K, V>;
type Item = (K, V);

fn par_drain(self) -> Self::Iter {
let vec: Vec<_> = self.drain().collect();
Drain {
inner: vec.into_par_iter(),
marker: PhantomData,
}
}
}

delegate_iterator! {
Drain<'_, K, V> => (K, V),
impl<K: Hash + Eq + Send, V: Send>
}
27 changes: 27 additions & 0 deletions src/collections/hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashSet;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;

use crate::iter::plumbing::*;
use crate::iter::*;
Expand Down Expand Up @@ -51,3 +52,29 @@ delegate_iterator! {
}

// `HashSet` doesn't have a mutable `Iterator`

/// Draining parallel iterator that moves out of a hash set,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Hash + Eq + Send> {
inner: vec::IntoIter<T>,
marker: PhantomData<&'a mut HashSet<T>>,
}

impl<'a, T: Hash + Eq + Send, S: BuildHasher> ParallelDrainFull for &'a mut HashSet<T, S> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain(self) -> Self::Iter {
let vec: Vec<_> = self.drain().collect();
Drain {
inner: vec.into_par_iter(),
marker: PhantomData,
}
}
}

delegate_iterator! {
Drain<'_, T> => T,
impl<T: Hash + Eq + Send>
}
54 changes: 54 additions & 0 deletions src/collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,57 @@ pub mod hash_map;
pub mod hash_set;
pub mod linked_list;
pub mod vec_deque;

use self::drain_guard::DrainGuard;

mod drain_guard {
use crate::iter::ParallelDrainRange;
use std::mem;
use std::ops::RangeBounds;

/// A proxy for draining a collection by converting to a `Vec` and back.
///
/// This is used for draining `BinaryHeap` and `VecDeque`, which both have
/// zero-allocation conversions to/from `Vec`, though not zero-cost:
/// - `BinaryHeap` will heapify from `Vec`, but at least that will be empty.
/// - `VecDeque` has to shift items to offset 0 when converting to `Vec`.
#[allow(missing_debug_implementations)]
pub(super) struct DrainGuard<'a, T, C: From<Vec<T>>> {
collection: &'a mut C,
vec: Vec<T>,
}

impl<'a, T, C> DrainGuard<'a, T, C>
where
C: Default + From<Vec<T>>,
Vec<T>: From<C>,
{
pub(super) fn new(collection: &'a mut C) -> Self {
Self {
// Temporarily steal the inner `Vec` so we can drain in place.
vec: Vec::from(mem::replace(collection, C::default())),
collection,
}
}
}

impl<'a, T, C: From<Vec<T>>> Drop for DrainGuard<'a, T, C> {
fn drop(&mut self) {
// Restore the collection from the `Vec` with its original capacity.
*self.collection = C::from(mem::replace(&mut self.vec, Vec::new()));
}
}

impl<'a, T, C> ParallelDrainRange<usize> for &'a mut DrainGuard<'_, T, C>
where
T: Send,
C: From<Vec<T>>,
{
type Iter = crate::vec::Drain<'a, T>;
type Item = T;

fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
self.vec.par_drain(range)
}
}
}
84 changes: 81 additions & 3 deletions src/collections/vec_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
//! unless you have need to name one of the iterator types.
use std::collections::VecDeque;
use std::ops::{Range, RangeBounds};

use crate::iter::plumbing::*;
use crate::iter::*;
use crate::math::simplify_range;

use crate::slice;
use crate::vec;
Expand All @@ -16,9 +18,15 @@ pub struct IntoIter<T: Send> {
inner: vec::IntoIter<T>,
}

into_par_vec! {
VecDeque<T> => IntoIter<T>,
impl<T: Send>
impl<T: Send> IntoParallelIterator for VecDeque<T> {
type Item = T;
type Iter = IntoIter<T>;

fn into_par_iter(self) -> Self::Iter {
// NOTE: requires data movement if the deque doesn't start at offset 0.
let inner = Vec::from(self).into_par_iter();
IntoIter { inner }
}
}

delegate_indexed_iterator! {
Expand Down Expand Up @@ -79,3 +87,73 @@ delegate_indexed_iterator! {
IterMut<'a, T> => &'a mut T,
impl<'a, T: Send + 'a>
}

/// Draining parallel iterator that moves a range out of a double-ended queue,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Send> {
deque: &'a mut VecDeque<T>,
range: Range<usize>,
orig_len: usize,
}

impl<'a, T: Send> ParallelDrainRange<usize> for &'a mut VecDeque<T> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
Drain {
orig_len: self.len(),
range: simplify_range(range, self.len()),
deque: self,
}
}
}

impl<'a, T: Send> ParallelIterator for Drain<'a, T> {
type Item = T;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
bridge(self, consumer)
}

fn opt_len(&self) -> Option<usize> {
Some(self.len())
}
}

impl<'a, T: Send> IndexedParallelIterator for Drain<'a, T> {
fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>,
{
bridge(self, consumer)
}

fn len(&self) -> usize {
self.range.len()
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
// NOTE: requires data movement if the deque doesn't start at offset 0.
super::DrainGuard::new(self.deque)
.par_drain(self.range.clone())
.with_producer(callback)
}
}

impl<'a, T: Send> Drop for Drain<'a, T> {
fn drop(&mut self) {
if self.deque.len() != self.orig_len - self.range.len() {
// We must not have produced, so just call a normal drain to remove the items.
assert_eq!(self.deque.len(), self.orig_len);
self.deque.drain(self.range.clone());
}
}
}
Loading

0 comments on commit 3dcc07e

Please sign in to comment.