rayon/iter/
find.rs

1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5pub(super) fn find<I, P>(pi: I, find_op: P) -> Option<I::Item>
6where
7    I: ParallelIterator,
8    P: Fn(&I::Item) -> bool + Sync,
9{
10    let found = AtomicBool::new(false);
11    let consumer = FindConsumer::new(&find_op, &found);
12    pi.drive_unindexed(consumer)
13}
14
15struct FindConsumer<'p, P> {
16    find_op: &'p P,
17    found: &'p AtomicBool,
18}
19
20impl<'p, P> FindConsumer<'p, P> {
21    fn new(find_op: &'p P, found: &'p AtomicBool) -> Self {
22        FindConsumer { find_op, found }
23    }
24}
25
26impl<'p, T, P: 'p> Consumer<T> for FindConsumer<'p, P>
27where
28    T: Send,
29    P: Fn(&T) -> bool + Sync,
30{
31    type Folder = FindFolder<'p, T, P>;
32    type Reducer = FindReducer;
33    type Result = Option<T>;
34
35    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
36        (self.split_off_left(), self, FindReducer)
37    }
38
39    fn into_folder(self) -> Self::Folder {
40        FindFolder {
41            find_op: self.find_op,
42            found: self.found,
43            item: None,
44        }
45    }
46
47    fn full(&self) -> bool {
48        self.found.load(Ordering::Relaxed)
49    }
50}
51
52impl<'p, T, P: 'p> UnindexedConsumer<T> for FindConsumer<'p, P>
53where
54    T: Send,
55    P: Fn(&T) -> bool + Sync,
56{
57    fn split_off_left(&self) -> Self {
58        FindConsumer::new(self.find_op, self.found)
59    }
60
61    fn to_reducer(&self) -> Self::Reducer {
62        FindReducer
63    }
64}
65
66struct FindFolder<'p, T, P> {
67    find_op: &'p P,
68    found: &'p AtomicBool,
69    item: Option<T>,
70}
71
72impl<'p, T, P> Folder<T> for FindFolder<'p, T, P>
73where
74    P: Fn(&T) -> bool + 'p,
75{
76    type Result = Option<T>;
77
78    fn consume(mut self, item: T) -> Self {
79        if (self.find_op)(&item) {
80            self.found.store(true, Ordering::Relaxed);
81            self.item = Some(item);
82        }
83        self
84    }
85
86    fn consume_iter<I>(mut self, iter: I) -> Self
87    where
88        I: IntoIterator<Item = T>,
89    {
90        fn not_full<T>(found: &AtomicBool) -> impl Fn(&T) -> bool + '_ {
91            move |_| !found.load(Ordering::Relaxed)
92        }
93
94        self.item = iter
95            .into_iter()
96            // stop iterating if another thread has found something
97            .take_while(not_full(self.found))
98            .find(self.find_op);
99        if self.item.is_some() {
100            self.found.store(true, Ordering::Relaxed)
101        }
102        self
103    }
104
105    fn complete(self) -> Self::Result {
106        self.item
107    }
108
109    fn full(&self) -> bool {
110        self.found.load(Ordering::Relaxed)
111    }
112}
113
114struct FindReducer;
115
116impl<T> Reducer<Option<T>> for FindReducer {
117    fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
118        left.or(right)
119    }
120}