rayon/iter/find_first_last/
mod.rs

1use super::plumbing::*;
2use super::*;
3use std::cell::Cell;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6#[cfg(test)]
7mod test;
8
9// The key optimization for find_first is that a consumer can stop its search if
10// some consumer to its left already found a match (and similarly for consumers
11// to the right for find_last). To make this work, all consumers need some
12// notion of their position in the data relative to other consumers, including
13// unindexed consumers that have no built-in notion of position.
14//
15// To solve this, we assign each consumer a lower and upper bound for an
16// imaginary "range" of data that it consumes. The initial consumer starts with
17// the range 0..usize::max_value(). The split divides this range in half so that
18// one resulting consumer has the range 0..(usize::max_value() / 2), and the
19// other has (usize::max_value() / 2)..usize::max_value(). Every subsequent
20// split divides the range in half again until it cannot be split anymore
21// (i.e. its length is 1), in which case the split returns two consumers with
22// the same range. In that case both consumers will continue to consume all
23// their data regardless of whether a better match is found, but the reducer
24// will still return the correct answer.
25
26#[derive(Copy, Clone)]
27enum MatchPosition {
28    Leftmost,
29    Rightmost,
30}
31
32/// Returns true if pos1 is a better match than pos2 according to MatchPosition
33#[inline]
34fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool {
35    match mp {
36        MatchPosition::Leftmost => pos1 < pos2,
37        MatchPosition::Rightmost => pos1 > pos2,
38    }
39}
40
41pub(super) fn find_first<I, P>(pi: I, find_op: P) -> Option<I::Item>
42where
43    I: ParallelIterator,
44    P: Fn(&I::Item) -> bool + Sync,
45{
46    let best_found = AtomicUsize::new(usize::max_value());
47    let consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found);
48    pi.drive_unindexed(consumer)
49}
50
51pub(super) fn find_last<I, P>(pi: I, find_op: P) -> Option<I::Item>
52where
53    I: ParallelIterator,
54    P: Fn(&I::Item) -> bool + Sync,
55{
56    let best_found = AtomicUsize::new(0);
57    let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found);
58    pi.drive_unindexed(consumer)
59}
60
61struct FindConsumer<'p, P> {
62    find_op: &'p P,
63    lower_bound: Cell<usize>,
64    upper_bound: usize,
65    match_position: MatchPosition,
66    best_found: &'p AtomicUsize,
67}
68
69impl<'p, P> FindConsumer<'p, P> {
70    fn new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self {
71        FindConsumer {
72            find_op,
73            lower_bound: Cell::new(0),
74            upper_bound: usize::max_value(),
75            match_position,
76            best_found,
77        }
78    }
79
80    fn current_index(&self) -> usize {
81        match self.match_position {
82            MatchPosition::Leftmost => self.lower_bound.get(),
83            MatchPosition::Rightmost => self.upper_bound,
84        }
85    }
86}
87
88impl<'p, T, P> Consumer<T> for FindConsumer<'p, P>
89where
90    T: Send,
91    P: Fn(&T) -> bool + Sync,
92{
93    type Folder = FindFolder<'p, T, P>;
94    type Reducer = FindReducer;
95    type Result = Option<T>;
96
97    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
98        let dir = self.match_position;
99        (
100            self.split_off_left(),
101            self,
102            FindReducer {
103                match_position: dir,
104            },
105        )
106    }
107
108    fn into_folder(self) -> Self::Folder {
109        FindFolder {
110            find_op: self.find_op,
111            boundary: self.current_index(),
112            match_position: self.match_position,
113            best_found: self.best_found,
114            item: None,
115        }
116    }
117
118    fn full(&self) -> bool {
119        // can stop consuming if the best found index so far is *strictly*
120        // better than anything this consumer will find
121        better_position(
122            self.best_found.load(Ordering::Relaxed),
123            self.current_index(),
124            self.match_position,
125        )
126    }
127}
128
129impl<'p, T, P> UnindexedConsumer<T> for FindConsumer<'p, P>
130where
131    T: Send,
132    P: Fn(&T) -> bool + Sync,
133{
134    fn split_off_left(&self) -> Self {
135        // Upper bound for one consumer will be lower bound for the other. This
136        // overlap is okay, because only one of the bounds will be used for
137        // comparing against best_found; the other is kept only to be able to
138        // divide the range in half.
139        //
140        // When the resolution of usize has been exhausted (i.e. when
141        // upper_bound = lower_bound), both results of this split will have the
142        // same range. When that happens, we lose the ability to tell one
143        // consumer to stop working when the other finds a better match, but the
144        // reducer ensures that the best answer is still returned (see the test
145        // above).
146        let old_lower_bound = self.lower_bound.get();
147        let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2);
148        self.lower_bound.set(median);
149
150        FindConsumer {
151            find_op: self.find_op,
152            lower_bound: Cell::new(old_lower_bound),
153            upper_bound: median,
154            match_position: self.match_position,
155            best_found: self.best_found,
156        }
157    }
158
159    fn to_reducer(&self) -> Self::Reducer {
160        FindReducer {
161            match_position: self.match_position,
162        }
163    }
164}
165
166struct FindFolder<'p, T, P> {
167    find_op: &'p P,
168    boundary: usize,
169    match_position: MatchPosition,
170    best_found: &'p AtomicUsize,
171    item: Option<T>,
172}
173
174impl<'p, P: 'p + Fn(&T) -> bool, T> Folder<T> for FindFolder<'p, T, P> {
175    type Result = Option<T>;
176
177    fn consume(mut self, item: T) -> Self {
178        let found_best_in_range = match self.match_position {
179            MatchPosition::Leftmost => self.item.is_some(),
180            MatchPosition::Rightmost => false,
181        };
182
183        if !found_best_in_range && (self.find_op)(&item) {
184            // Update the best found index if ours is better.
185            let update =
186                self.best_found
187                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
188                        better_position(self.boundary, current, self.match_position)
189                            .then_some(self.boundary)
190                    });
191
192            // Save this item if our index was better or equal.
193            if update.is_ok() || update == Err(self.boundary) {
194                self.item = Some(item);
195            }
196        }
197        self
198    }
199
200    fn complete(self) -> Self::Result {
201        self.item
202    }
203
204    fn full(&self) -> bool {
205        let found_best_in_range = match self.match_position {
206            MatchPosition::Leftmost => self.item.is_some(),
207            MatchPosition::Rightmost => false,
208        };
209
210        found_best_in_range
211            || better_position(
212                self.best_found.load(Ordering::Relaxed),
213                self.boundary,
214                self.match_position,
215            )
216    }
217}
218
219struct FindReducer {
220    match_position: MatchPosition,
221}
222
223impl<T> Reducer<Option<T>> for FindReducer {
224    fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
225        match self.match_position {
226            MatchPosition::Leftmost => left.or(right),
227            MatchPosition::Rightmost => right.or(left),
228        }
229    }
230}