Skip to content

Commit

Permalink
Merge pull request #189 from schuster/find_first
Browse files Browse the repository at this point in the history
Implement find_first/last, position_first/last
  • Loading branch information
nikomatsakis authored Dec 31, 2016
2 parents 614d8d7 + e2977b1 commit 4614458
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 2 deletions.
264 changes: 264 additions & 0 deletions src/par_iter/find_first_last/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
use std::cell::Cell;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::internal::*;
use super::*;
use super::len::*;

#[cfg(test)]
mod test;

// The key optimization for find_first is that a consumer can stop its search if
// some consumer to its left already found a match (and similarly for consumers
// to the right for find_last). To make this work, all consumers need some
// notion of their position in the data relative to other consumers, including
// unindexed consumers that have no built-in notion of position.
//
// To solve this, we assign each consumer a lower and upper bound for an
// imaginary "range" of data that it consumes. The initial consumer starts with
// the range 0..usize::max_value(). The split divides this range in half so that
// one resulting consumer has the range 0..(usize::max_value() / 2), and the
// other has (usize::max_value() / 2)..usize::max_value(). Every subsequent
// split divides the range in half again until it cannot be split anymore
// (i.e. its length is 1), in which case the split returns two consumers with
// the same range. In that case both consumers will continue to consume all
// their data regardless of whether a better match is found, but the reducer
// will still return the correct answer.

#[derive(Copy, Clone)]
enum MatchPosition {
Leftmost,
Rightmost,
}

// Returns true if pos1 is a better match than pos2 according to MatchPosition
fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool {
match mp {
MatchPosition::Leftmost => pos1 < pos2,
MatchPosition::Rightmost => pos1 > pos2,
}
}

pub fn find_first<PAR_ITER, FIND_OP>(pi: PAR_ITER, find_op: FIND_OP) -> Option<PAR_ITER::Item>
where PAR_ITER: ParallelIterator,
FIND_OP: Fn(&PAR_ITER::Item) -> bool + Sync
{
let best_found = AtomicUsize::new(usize::max_value());
let consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found);
pi.drive_unindexed(consumer)
}

pub fn find_last<PAR_ITER, FIND_OP>(pi: PAR_ITER, find_op: FIND_OP) -> Option<PAR_ITER::Item>
where PAR_ITER: ParallelIterator,
FIND_OP: Fn(&PAR_ITER::Item) -> bool + Sync
{
let best_found = AtomicUsize::new(0);
let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found);
pi.drive_unindexed(consumer)
}

struct FindConsumer<'f, FIND_OP: 'f> {
find_op: &'f FIND_OP,
lower_bound: Cell<usize>,
upper_bound: usize,
match_position: MatchPosition,
best_found: &'f AtomicUsize,
}

impl<'f, FIND_OP> FindConsumer<'f, FIND_OP> {
fn new(find_op: &'f FIND_OP,
match_position: MatchPosition,
best_found: &'f AtomicUsize) -> Self {
FindConsumer {
find_op: find_op,
lower_bound: Cell::new(0),
upper_bound: usize::max_value(),
match_position: match_position,
best_found: best_found,
}
}

fn current_index(&self) -> usize {
match self.match_position {
MatchPosition::Leftmost => self.lower_bound.get(),
MatchPosition::Rightmost => self.upper_bound
}
}
}

impl<'f, ITEM, FIND_OP> Consumer<ITEM> for FindConsumer<'f, FIND_OP>
where ITEM: Send,
FIND_OP: Fn(&ITEM) -> bool + Sync
{
type Folder = FindFolder<'f, ITEM, FIND_OP>;
type Reducer = FindReducer;
type Result = Option<ITEM>;

fn cost(&mut self, cost: f64) -> f64 {
cost * FUNC_ADJUSTMENT
}

fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
let dir = self.match_position;
(self.split_off(),
self,
FindReducer { match_position: dir })
}

fn into_folder(self) -> Self::Folder {
FindFolder {
find_op: self.find_op,
boundary: self.current_index(),
match_position: self.match_position,
best_found: self.best_found,
item: None,
}
}

fn full(&self) -> bool {
// can stop consuming if the best found index so far is *strictly*
// better than anything this consumer will find
better_position(self.best_found.load(Ordering::Relaxed),
self.current_index(),
self.match_position)
}
}

impl<'f, ITEM, FIND_OP> UnindexedConsumer<ITEM> for FindConsumer<'f, FIND_OP>
where ITEM: Send,
FIND_OP: Fn(&ITEM) -> bool + Sync
{
fn split_off(&self) -> Self {
// Upper bound for one consumer will be lower bound for the other. This
// overlap is okay, because only one of the bounds will be used for
// comparing against best_found; the other is kept only to be able to
// divide the range in half.
//
// When the resolution of usize has been exhausted (i.e. when
// upper_bound = lower_bound), both results of this split will have the
// same range. When that happens, we lose the ability to tell one
// consumer to stop working when the other finds a better match, but the
// reducer ensures that the best answer is still returned (see the test
// above).
//
// This code assumes that the caller of split_off will use the result as
// the *left* side of this iterator, and the remainder of self as the
// *right* side.
let old_lower_bound = self.lower_bound.get();
let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2);
self.lower_bound.set(median);

FindConsumer {
find_op: self.find_op,
lower_bound: Cell::new(old_lower_bound),
upper_bound: median,
match_position: self.match_position,
best_found: self.best_found,
}
}

fn to_reducer(&self) -> Self::Reducer {
FindReducer { match_position: self.match_position }
}
}

struct FindFolder<'f, ITEM, FIND_OP: 'f> {
find_op: &'f FIND_OP,
boundary: usize,
match_position: MatchPosition,
best_found: &'f AtomicUsize,
item: Option<ITEM>,
}

impl<'f, FIND_OP: 'f + Fn(&ITEM) -> bool, ITEM> Folder<ITEM> for FindFolder<'f, ITEM, FIND_OP> {
type Result = Option<ITEM>;

fn consume(mut self, item: ITEM) -> Self {
let found_best_in_range = match self.match_position {
MatchPosition::Leftmost => self.item.is_some(),
MatchPosition::Rightmost => false,
};

if !found_best_in_range && (self.find_op)(&item) {
// Continuously try to set best_found until we succeed or we
// discover a better match was already found.
let mut current = self.best_found.load(Ordering::Relaxed);
loop {
if better_position(current, self.boundary, self.match_position) {
break;
}
match self.best_found.compare_exchange_weak(current,
self.boundary,
Ordering::Relaxed,
Ordering::Relaxed) {
Ok(_) => {
self.item = Some(item);
break;
},
Err(v) => current = v,
}
}
}
self
}

fn complete(self) -> Self::Result {
self.item
}

fn full(&self) -> bool {
let found_best_in_range = match self.match_position {
MatchPosition::Leftmost => self.item.is_some(),
MatchPosition::Rightmost => false,
};

found_best_in_range ||
better_position(self.best_found.load(Ordering::Relaxed),
self.boundary,
self.match_position)
}
}

// These tests requires that a folder be assigned to an iterator with more than
// one element. We can't necessarily determine when that will happen for a given
// input to find_first/find_last, so we test the folder directly here instead.
#[test]
fn find_first_folder_does_not_clobber_first_found() {
let best_found = AtomicUsize::new(usize::max_value());
let f = FindFolder {
find_op: &(|&x: &i32| -> bool { true }),
boundary: 0,
match_position: MatchPosition::Leftmost,
best_found: &best_found,
item: None,
};
let f = f.consume(0_i32).consume(1_i32).consume(2_i32);
assert!(f.full());
assert_eq!(f.complete(), Some(0_i32));
}

#[test]
fn find_last_folder_yields_last_match() {
let best_found = AtomicUsize::new(0);
let f = FindFolder {
find_op: &(|&x: &i32| -> bool { true }),
boundary: 0,
match_position: MatchPosition::Rightmost,
best_found: &best_found,
item: None,
};
let f = f.consume(0_i32).consume(1_i32).consume(2_i32);
assert_eq!(f.complete(), Some(2_i32));
}

struct FindReducer {
match_position: MatchPosition
}

impl<ITEM> Reducer<Option<ITEM>> for FindReducer {
fn reduce(self, left: Option<ITEM>, right: Option<ITEM>) -> Option<ITEM> {
match self.match_position {
MatchPosition::Leftmost => left.or(right),
MatchPosition::Rightmost => right.or(left)
}
}
}
69 changes: 69 additions & 0 deletions src/par_iter/find_first_last/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::sync::atomic::AtomicUsize;
use super::*;

#[test]
fn same_range_first_consumers_return_correct_answer() {
let find_op = |x: &i32| x % 2 == 0;
let first_found = AtomicUsize::new(usize::max_value());
let far_right_consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &first_found);

// We save a consumer that will be far to the right of the main consumer (and therefore not
// sharing an index range with that consumer) for fullness testing
let consumer = far_right_consumer.split_off();

// split until we have an indivisible range
let bits_in_usize = usize::min_value().count_zeros();
for i in 0..bits_in_usize {
consumer.split_off();
}

let reducer = consumer.to_reducer();
// the left and right folders should now have the same range, having
// exhausted the resolution of usize
let left_folder = consumer.split_off().into_folder();
let right_folder = consumer.into_folder();

let left_folder = left_folder.consume(0).consume(1);
assert_eq!(left_folder.boundary, right_folder.boundary);
// expect not full even though a better match has been found because the
// ranges are the same
assert!(!right_folder.full());
assert!(far_right_consumer.full());
let right_folder = right_folder.consume(2).consume(3);
assert_eq!(reducer.reduce(left_folder.complete(), right_folder.complete()),
Some(0));
}

#[test]
fn same_range_last_consumers_return_correct_answer() {
let find_op = |x: &i32| x % 2 == 0;
let last_found = AtomicUsize::new(0);
let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &last_found);

// We save a consumer that will be far to the left of the main consumer (and therefore not
// sharing an index range with that consumer) for fullness testing
let far_left_consumer = consumer.split_off();

// split until we have an indivisible range
let bits_in_usize = usize::min_value().count_zeros();
for i in 0..bits_in_usize {
consumer.split_off();
}

let reducer = consumer.to_reducer();
// due to the exact calculation in split_off, the very last consumer has a
// range of width 2, so we use the second-to-last consumer instead to get
// the same boundary on both folders
let consumer = consumer.split_off();
let left_folder = consumer.split_off().into_folder();
let right_folder = consumer.into_folder();
let right_folder = right_folder.consume(2).consume(3);
assert_eq!(left_folder.boundary, right_folder.boundary);
// expect not full even though a better match has been found because the
// ranges are the same
assert!(!left_folder.full());
assert!(far_left_consumer.full());
let left_folder = left_folder.consume(0).consume(1);
assert_eq!(reducer.reduce(left_folder.complete(), right_folder.complete()),
Some(2));
}
Loading

0 comments on commit 4614458

Please sign in to comment.