rayon/iter/
try_reduce_with.rs

1use super::plumbing::*;
2use super::ParallelIterator;
3use super::Try;
4
5use std::ops::ControlFlow::{self, Break, Continue};
6use std::sync::atomic::{AtomicBool, Ordering};
7
8pub(super) fn try_reduce_with<PI, R, T>(pi: PI, reduce_op: R) -> Option<T>
9where
10    PI: ParallelIterator<Item = T>,
11    R: Fn(T::Output, T::Output) -> T + Sync,
12    T: Try + Send,
13{
14    let full = AtomicBool::new(false);
15    let consumer = TryReduceWithConsumer {
16        reduce_op: &reduce_op,
17        full: &full,
18    };
19    pi.drive_unindexed(consumer)
20}
21
22struct TryReduceWithConsumer<'r, R> {
23    reduce_op: &'r R,
24    full: &'r AtomicBool,
25}
26
27impl<'r, R> Copy for TryReduceWithConsumer<'r, R> {}
28
29impl<'r, R> Clone for TryReduceWithConsumer<'r, R> {
30    fn clone(&self) -> Self {
31        *self
32    }
33}
34
35impl<'r, R, T> Consumer<T> for TryReduceWithConsumer<'r, R>
36where
37    R: Fn(T::Output, T::Output) -> T + Sync,
38    T: Try + Send,
39{
40    type Folder = TryReduceWithFolder<'r, R, T>;
41    type Reducer = Self;
42    type Result = Option<T>;
43
44    fn split_at(self, _index: usize) -> (Self, Self, Self) {
45        (self, self, self)
46    }
47
48    fn into_folder(self) -> Self::Folder {
49        TryReduceWithFolder {
50            reduce_op: self.reduce_op,
51            opt_control: None,
52            full: self.full,
53        }
54    }
55
56    fn full(&self) -> bool {
57        self.full.load(Ordering::Relaxed)
58    }
59}
60
61impl<'r, R, T> UnindexedConsumer<T> for TryReduceWithConsumer<'r, R>
62where
63    R: Fn(T::Output, T::Output) -> T + Sync,
64    T: Try + Send,
65{
66    fn split_off_left(&self) -> Self {
67        *self
68    }
69
70    fn to_reducer(&self) -> Self::Reducer {
71        *self
72    }
73}
74
75impl<'r, R, T> Reducer<Option<T>> for TryReduceWithConsumer<'r, R>
76where
77    R: Fn(T::Output, T::Output) -> T + Sync,
78    T: Try,
79{
80    fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
81        let reduce_op = self.reduce_op;
82        match (left, right) {
83            (Some(left), Some(right)) => match (left.branch(), right.branch()) {
84                (Continue(left), Continue(right)) => Some(reduce_op(left, right)),
85                (Break(r), _) | (_, Break(r)) => Some(T::from_residual(r)),
86            },
87            (None, x) | (x, None) => x,
88        }
89    }
90}
91
92struct TryReduceWithFolder<'r, R, T: Try> {
93    reduce_op: &'r R,
94    opt_control: Option<ControlFlow<T::Residual, T::Output>>,
95    full: &'r AtomicBool,
96}
97
98impl<'r, R, T> Folder<T> for TryReduceWithFolder<'r, R, T>
99where
100    R: Fn(T::Output, T::Output) -> T,
101    T: Try,
102{
103    type Result = Option<T>;
104
105    fn consume(mut self, item: T) -> Self {
106        let reduce_op = self.reduce_op;
107        let control = match (self.opt_control, item.branch()) {
108            (Some(Continue(left)), Continue(right)) => reduce_op(left, right).branch(),
109            (Some(control @ Break(_)), _) | (_, control) => control,
110        };
111        if let Break(_) = control {
112            self.full.store(true, Ordering::Relaxed)
113        }
114        self.opt_control = Some(control);
115        self
116    }
117
118    fn complete(self) -> Option<T> {
119        match self.opt_control {
120            Some(Continue(c)) => Some(T::from_output(c)),
121            Some(Break(r)) => Some(T::from_residual(r)),
122            None => None,
123        }
124    }
125
126    fn full(&self) -> bool {
127        match self.opt_control {
128            Some(Break(_)) => true,
129            _ => self.full.load(Ordering::Relaxed),
130        }
131    }
132}