rayon/iter/
try_reduce.rs

1use super::plumbing::*;
2use super::ParallelIterator;
3use super::Try;
4
5use std::ops::ControlFlow::{self, Break, Continue};
6use std::sync::atomic::{AtomicBool, Ordering};
7
8pub(super) fn try_reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T
9where
10    PI: ParallelIterator<Item = T>,
11    R: Fn(T::Output, T::Output) -> T + Sync,
12    ID: Fn() -> T::Output + Sync,
13    T: Try + Send,
14{
15    let full = AtomicBool::new(false);
16    let consumer = TryReduceConsumer {
17        identity: &identity,
18        reduce_op: &reduce_op,
19        full: &full,
20    };
21    pi.drive_unindexed(consumer)
22}
23
24struct TryReduceConsumer<'r, R, ID> {
25    identity: &'r ID,
26    reduce_op: &'r R,
27    full: &'r AtomicBool,
28}
29
30impl<'r, R, ID> Copy for TryReduceConsumer<'r, R, ID> {}
31
32impl<'r, R, ID> Clone for TryReduceConsumer<'r, R, ID> {
33    fn clone(&self) -> Self {
34        *self
35    }
36}
37
38impl<'r, R, ID, T> Consumer<T> for TryReduceConsumer<'r, R, ID>
39where
40    R: Fn(T::Output, T::Output) -> T + Sync,
41    ID: Fn() -> T::Output + Sync,
42    T: Try + Send,
43{
44    type Folder = TryReduceFolder<'r, R, T>;
45    type Reducer = Self;
46    type Result = T;
47
48    fn split_at(self, _index: usize) -> (Self, Self, Self) {
49        (self, self, self)
50    }
51
52    fn into_folder(self) -> Self::Folder {
53        TryReduceFolder {
54            reduce_op: self.reduce_op,
55            control: Continue((self.identity)()),
56            full: self.full,
57        }
58    }
59
60    fn full(&self) -> bool {
61        self.full.load(Ordering::Relaxed)
62    }
63}
64
65impl<'r, R, ID, T> UnindexedConsumer<T> for TryReduceConsumer<'r, R, ID>
66where
67    R: Fn(T::Output, T::Output) -> T + Sync,
68    ID: Fn() -> T::Output + Sync,
69    T: Try + Send,
70{
71    fn split_off_left(&self) -> Self {
72        *self
73    }
74
75    fn to_reducer(&self) -> Self::Reducer {
76        *self
77    }
78}
79
80impl<'r, R, ID, T> Reducer<T> for TryReduceConsumer<'r, R, ID>
81where
82    R: Fn(T::Output, T::Output) -> T + Sync,
83    T: Try,
84{
85    fn reduce(self, left: T, right: T) -> T {
86        match (left.branch(), right.branch()) {
87            (Continue(left), Continue(right)) => (self.reduce_op)(left, right),
88            (Break(r), _) | (_, Break(r)) => T::from_residual(r),
89        }
90    }
91}
92
93struct TryReduceFolder<'r, R, T: Try> {
94    reduce_op: &'r R,
95    control: ControlFlow<T::Residual, T::Output>,
96    full: &'r AtomicBool,
97}
98
99impl<'r, R, T> Folder<T> for TryReduceFolder<'r, R, T>
100where
101    R: Fn(T::Output, T::Output) -> T,
102    T: Try,
103{
104    type Result = T;
105
106    fn consume(mut self, item: T) -> Self {
107        let reduce_op = self.reduce_op;
108        self.control = match (self.control, item.branch()) {
109            (Continue(left), Continue(right)) => reduce_op(left, right).branch(),
110            (control @ Break(_), _) | (_, control @ Break(_)) => control,
111        };
112        if let Break(_) = self.control {
113            self.full.store(true, Ordering::Relaxed);
114        }
115        self
116    }
117
118    fn complete(self) -> T {
119        match self.control {
120            Continue(c) => T::from_output(c),
121            Break(r) => T::from_residual(r),
122        }
123    }
124
125    fn full(&self) -> bool {
126        match self.control {
127            Break(_) => true,
128            _ => self.full.load(Ordering::Relaxed),
129        }
130    }
131}