Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement find_first/last, position_first/last #189

Merged
merged 12 commits into from
Dec 31, 2016
183 changes: 183 additions & 0 deletions src/par_iter/find_first_last.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use std::cell::Cell;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::internal::*;
use super::*;
use super::len::*;

// The consumer for find_first/find_last has fake indexes representing the lower
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to leaving some comments on how things work =)

// and upper bounds of the "range" of data it consumes. This range does not
// correspond to indexes from the consumed iterator but rather indicate the
// consumer's position relative to other consumers. The purpose is to allow a
// consumer to know it should stop consuming items when another consumer finds a
// better match.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Total nit: I'd like a // here to indicate that these are two paragraphs in the same comment.

// An indexed consumer could specialize to use the real indexes instead, but we
// don't implement that for now. The only downside of the current approach is
// that in some cases, iterators very close to each other will have the same
// range and therefore not be able to stop processing if one of them finds a
// better match than the others.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can you elaborate here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I read the code now, so I understand, but I think an example would be great. Basically just saying: we start out with an iterator over 0..usize::max, then split this range in half, and so forth.

Also, should we use u64 just to get that much more resolution?

Also, I think we need a test for the case where we have an iterator that exceeds the resolution of the false indices. Or at least some careful comments explaining why it is correct. =)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll work on that explanation.

I think AtomicU64 is not stable yet (tracked here). Would it be worth postponing merging this PR until it's stable? Of course, we should be able to make that change later, because the exact size of the integer used is private to this module.

I agree that a test would be useful to validate my hand-waving in the comments. Any thoughts on how to write that test? I don't yet know enough about the splitting logic to know how to force that case. One possibility is to start with a FindConsumer whose upper and lower bounds are the same, but that requires either making the FindConsumer struct public so it can be used in the test module, or writing a test directly in this module (which none of the rest of this project does).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think AtomicU64 is not stable yet (tracked here). Would it be worth postponing merging this PR until it's stable? Of course, we should be able to make that change later, because the exact size of the integer used is private to this module.

Nah, just use a usize. It'll make it easier to write a test. :)

As for the test... well, you could... perhaps write something that drives the splitting by hand? i.e., take a u64 range, create a producer from it, and manually call split etc? The test can be internal to this module if it needs access to private state...

...at minimum some thorough comments for how that case is handled would be a good start.


#[derive(Copy, Clone)]
enum MatchPosition {
Leftmost,
Rightmost,
}

pub fn find_first<PAR_ITER, FIND_OP>(pi: PAR_ITER, find_op: FIND_OP) -> Option<PAR_ITER::Item>
where PAR_ITER: ParallelIterator,
FIND_OP: Fn(&PAR_ITER::Item) -> bool + Sync
{
let best_found = AtomicUsize::new(usize::max_value());
let consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found);
pi.drive_unindexed(consumer)
}

pub fn find_last<PAR_ITER, FIND_OP>(pi: PAR_ITER, find_op: FIND_OP) -> Option<PAR_ITER::Item>
where PAR_ITER: ParallelIterator,
FIND_OP: Fn(&PAR_ITER::Item) -> bool + Sync
{
let best_found = AtomicUsize::new(0);
let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found);
pi.drive_unindexed(consumer)
}

struct FindConsumer<'f, FIND_OP: 'f> {
find_op: &'f FIND_OP,
lower_bound: Cell<usize>,
upper_bound: usize,
match_position: MatchPosition,
best_found: &'f AtomicUsize,
}

impl<'f, FIND_OP> FindConsumer<'f, FIND_OP> {
fn new(find_op: &'f FIND_OP,
match_position: MatchPosition,
best_found: &'f AtomicUsize) -> Self {
FindConsumer {
find_op: find_op,
lower_bound: Cell::new(0),
upper_bound: usize::max_value(),
match_position: match_position,
best_found: best_found,
}
}
}

impl<'f, ITEM, FIND_OP> Consumer<ITEM> for FindConsumer<'f, FIND_OP>
where ITEM: Send,
FIND_OP: Fn(&ITEM) -> bool + Sync
{
type Folder = FindFolder<'f, ITEM, FIND_OP>;
type Reducer = FindReducer;
type Result = Option<ITEM>;

fn cost(&mut self, cost: f64) -> f64 {
cost * FUNC_ADJUSTMENT
}

fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
let dir = self.match_position;
(self.split_off(),
self,
FindReducer { match_position: dir })
}

fn into_folder(self) -> Self::Folder {
FindFolder {
find_op: self.find_op,
boundary: match self.match_position {
MatchPosition::Leftmost => self.lower_bound.get(),
MatchPosition::Rightmost => self.upper_bound
},
match_position: self.match_position,
best_found: self.best_found,
item: None,
}
}

fn full(&self) -> bool {
let best = self.best_found.load(Ordering::Relaxed);
match self.match_position {
// can stop consuming if the best found index so far is *strictly*
// better than anything this consumer will find
MatchPosition::Leftmost => best < self.lower_bound.get(),
MatchPosition::Rightmost => best > self.upper_bound
}
}
}

impl<'f, ITEM, FIND_OP> UnindexedConsumer<ITEM> for FindConsumer<'f, FIND_OP>
where ITEM: Send,
FIND_OP: Fn(&ITEM) -> bool + Sync
{
fn split_off(&self) -> Self {
// Upper bound for one consumer will be lower bound for the other. This
// overlap is okay, because only one of the bounds will be used for
// comparing against best_found; the other is kept only to be able to
// divide the range in half
let old_lower_bound = self.lower_bound.get();
let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2);
self.lower_bound.set(median);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it's important to your algorithm that the result from split_off is always used for the left/lower side by whatever is calling it. Near as I can tell, that's always true in practice, but I don't think we've ever specified it must be. It's just generally convenient to call in order (consumer.split_off(), consumer) when splitting in two.

I don't think there's a problem here, but we need to be careful, and this deserves a comment at least.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can rename split_off() to split_left() or split_off_left() or split_left_off(), something like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; I'll add a comment. +1 to renaming split_off().


FindConsumer {
find_op: self.find_op,
lower_bound: Cell::new(old_lower_bound),
upper_bound: median,
match_position: self.match_position,
best_found: self.best_found,
}
}

fn to_reducer(&self) -> Self::Reducer {
FindReducer { match_position: self.match_position }
}
}

struct FindFolder<'f, ITEM, FIND_OP: 'f> {
find_op: &'f FIND_OP,
boundary: usize,
match_position: MatchPosition,
best_found: &'f AtomicUsize,
item: Option<ITEM>,
}

impl<'f, FIND_OP: 'f + Fn(&ITEM) -> bool, ITEM> Folder<ITEM> for FindFolder<'f, ITEM, FIND_OP> {
type Result = Option<ITEM>;

fn consume(mut self, item: ITEM) -> Self {
if (self.find_op)(&item) {
// This may sometimes set best_found to a worse index than it was
// before, depending on timing. This means more consumers will
// continue to run than necessary, but the reducer will still ensure
// the correct value is returned.
self.best_found.swap(self.boundary, Ordering::Relaxed);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do better than that with a compare_and_swap loop, repeating until you either get a successful swap or find that someone else swapped something "better" than you anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, seems like it'd be worth using a compare-exchange here. I'd probably use compare_exchange_weak, but that's a minor thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was indeed wondering whether a loop like you describe would be better. I'll write that up and push it.

self.item = Some(item);
}
self
}

fn complete(self) -> Self::Result {
self.item
}

fn full(&self) -> bool {
let best_found = self.best_found.load(Ordering::Relaxed);
match self.match_position {
MatchPosition::Leftmost => best_found < self.boundary,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Leftmost only, isn't self.item.is_some() also sufficient?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that matter, consume probably needs to make sure that Leftmost doesn't clobber any existing self.item. We treat full as a hint, but consume should ensure its own correctness when that's important. Maybe try some tests with many successive candidates that will hit the same folder, perhaps even find_first(|| true).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, am I confused? Why is self.item() sufficient? Don't we want to also stop if some other folder (to our left) has found an item?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikomatsakis I think he's saying it's also sufficient, which is correct (and a case I didn't think about). If we've already found the item, we need to make sure we stop before we find some other item slightly to the right.

So @cuviper, I think you're right that that logic should be added to both full and consume.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it's an additional case for stopping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, ok. I agree (this seems to relate to having some tests that cover this case)

MatchPosition::Rightmost => best_found > self.boundary,
}
}
}

struct FindReducer {
match_position: MatchPosition
}

impl<ITEM> Reducer<Option<ITEM>> for FindReducer {
fn reduce(self, left: Option<ITEM>, right: Option<ITEM>) -> Option<ITEM> {
match self.match_position {
MatchPosition::Leftmost => left.or(right),
MatchPosition::Rightmost => right.or(left)
}
}
}
55 changes: 55 additions & 0 deletions src/par_iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use self::weight::Weight;
use self::zip::ZipIter;

pub mod find;
pub mod find_first_last;
pub mod chain;
pub mod collect;
pub mod enumerate;
Expand Down Expand Up @@ -575,6 +576,28 @@ pub trait ParallelIterator: Sized {
find::find(self, predicate)
}

/// Searches for the **first** item in the parallel iterator that
/// matches the given predicate and returns it.
///
/// Once a match is found, all attempts to the right of the match
/// will be stopped, while attempts to the left must continue in case
/// an earlier match is found.
fn find_first<FIND_OP>(self, predicate: FIND_OP) -> Option<Self::Item>
where FIND_OP: Fn(&Self::Item) -> bool + Sync {
find_first_last::find_first(self, predicate)
}

/// Searches for the **last** item in the parallel iterator that
/// matches the given predicate and returns it.
///
/// Once a match is found, all attempts to the left of the match
/// will be stopped, while attempts to the right must continue in case
/// a later match is found.
fn find_last<FIND_OP>(self, predicate: FIND_OP) -> Option<Self::Item>
where FIND_OP: Fn(&Self::Item) -> bool + Sync {
find_first_last::find_last(self, predicate)
}

#[doc(hidden)]
#[deprecated(note = "parallel `find` does not search in order -- use `find_any`")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update this deprecated note to say: "use find_any, find_first, or find_last" -- we might also consider calling find_first(), not sure.

fn find<FIND_OP>(self, predicate: FIND_OP) -> Option<Self::Item>
Expand Down Expand Up @@ -824,6 +847,38 @@ pub trait IndexedParallelIterator: ExactParallelIterator {
.map(|(i, _)| i)
}

/// Searches for the **first** item in the parallel iterator that
/// matches the given predicate, and returns its index.
///
/// Like `ParallelIterator::find_first`, once a match is found,
/// all attempts to the right of the match will be stopped, while
/// attempts to the left must continue in case an earlier match
/// is found.
fn position_first<POSITION_OP>(self, predicate: POSITION_OP) -> Option<usize>
where POSITION_OP: Fn(Self::Item) -> bool + Sync
{
self.map(predicate)
.enumerate()
.find_first(|&(_, p)| p)
.map(|(i, _)| i)
}

/// Searches for the **last** item in the parallel iterator that
/// matches the given predicate, and returns its index.
///
/// Like `ParallelIterator::find_last`, once a match is found,
/// all attempts to the left of the match will be stopped, while
/// attempts to the right must continue in case a later match
/// is found.
fn position_last<POSITION_OP>(self, predicate: POSITION_OP) -> Option<usize>
where POSITION_OP: Fn(Self::Item) -> bool + Sync
{
self.map(predicate)
.enumerate()
.find_last(|&(_, p)| p)
.map(|(i, _)| i)
}

#[doc(hidden)]
#[deprecated(note = "parallel `position` does not search in order -- use `position_any`")]
fn position<POSITION_OP>(self, predicate: POSITION_OP) -> Option<usize>
Expand Down
25 changes: 25 additions & 0 deletions src/par_iter/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,31 @@ pub fn find_any() {
assert!(a.par_iter().all(|&x| x >= 0));
}

#[test]
pub fn find_first_or_last() {
let a: Vec<i32> = (0..1024).collect();

assert_eq!(a.par_iter().find_first(|&&x| x % 42 == 41), Some(&41_i32));
assert_eq!(a.par_iter().find_first(|&&x| x % 19 == 1 && x % 53 == 0),
Some(&742_i32));
assert_eq!(a.par_iter().find_first(|&&x| x < 0), None);

assert_eq!(a.par_iter().position_first(|&x| x % 42 == 41), Some(41_usize));
assert_eq!(a.par_iter().position_first(|&x| x % 19 == 1 && x % 53 == 0),
Some(742_usize));
assert_eq!(a.par_iter().position_first(|&x| x < 0), None);

assert_eq!(a.par_iter().find_last(|&&x| x % 42 == 41), Some(&1007_i32));
assert_eq!(a.par_iter().find_last(|&&x| x % 19 == 1 && x % 53 == 0),
Some(&742_i32));
assert_eq!(a.par_iter().find_last(|&&x| x < 0), None);

assert_eq!(a.par_iter().position_last(|&x| x % 42 == 41), Some(1007_usize));
assert_eq!(a.par_iter().position_last(|&x| x % 19 == 1 && x % 53 == 0),
Some(742_usize));
assert_eq!(a.par_iter().position_last(|&x| x < 0), None);
}

#[test]
pub fn check_find_not_present() {
let counter = AtomicUsize::new(0);
Expand Down