rayon/iter/
update.rs

1use super::plumbing::*;
2use super::*;
3
4use std::fmt::{self, Debug};
5
6/// `Update` is an iterator that mutates the elements of an
7/// underlying iterator before they are yielded.
8///
9/// This struct is created by the [`update()`] method on [`ParallelIterator`]
10///
11/// [`update()`]: trait.ParallelIterator.html#method.update
12/// [`ParallelIterator`]: trait.ParallelIterator.html
13#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14#[derive(Clone)]
15pub struct Update<I: ParallelIterator, F> {
16    base: I,
17    update_op: F,
18}
19
20impl<I: ParallelIterator + Debug, F> Debug for Update<I, F> {
21    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22        f.debug_struct("Update").field("base", &self.base).finish()
23    }
24}
25
26impl<I, F> Update<I, F>
27where
28    I: ParallelIterator,
29{
30    /// Creates a new `Update` iterator.
31    pub(super) fn new(base: I, update_op: F) -> Self {
32        Update { base, update_op }
33    }
34}
35
36impl<I, F> ParallelIterator for Update<I, F>
37where
38    I: ParallelIterator,
39    F: Fn(&mut I::Item) + Send + Sync,
40{
41    type Item = I::Item;
42
43    fn drive_unindexed<C>(self, consumer: C) -> C::Result
44    where
45        C: UnindexedConsumer<Self::Item>,
46    {
47        let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
48        self.base.drive_unindexed(consumer1)
49    }
50
51    fn opt_len(&self) -> Option<usize> {
52        self.base.opt_len()
53    }
54}
55
56impl<I, F> IndexedParallelIterator for Update<I, F>
57where
58    I: IndexedParallelIterator,
59    F: Fn(&mut I::Item) + Send + Sync,
60{
61    fn drive<C>(self, consumer: C) -> C::Result
62    where
63        C: Consumer<Self::Item>,
64    {
65        let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
66        self.base.drive(consumer1)
67    }
68
69    fn len(&self) -> usize {
70        self.base.len()
71    }
72
73    fn with_producer<CB>(self, callback: CB) -> CB::Output
74    where
75        CB: ProducerCallback<Self::Item>,
76    {
77        return self.base.with_producer(Callback {
78            callback,
79            update_op: self.update_op,
80        });
81
82        struct Callback<CB, F> {
83            callback: CB,
84            update_op: F,
85        }
86
87        impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
88        where
89            CB: ProducerCallback<T>,
90            F: Fn(&mut T) + Send + Sync,
91        {
92            type Output = CB::Output;
93
94            fn callback<P>(self, base: P) -> CB::Output
95            where
96                P: Producer<Item = T>,
97            {
98                let producer = UpdateProducer {
99                    base,
100                    update_op: &self.update_op,
101                };
102                self.callback.callback(producer)
103            }
104        }
105    }
106}
107
108/// ////////////////////////////////////////////////////////////////////////
109
110struct UpdateProducer<'f, P, F> {
111    base: P,
112    update_op: &'f F,
113}
114
115impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
116where
117    P: Producer,
118    F: Fn(&mut P::Item) + Send + Sync,
119{
120    type Item = P::Item;
121    type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
122
123    fn into_iter(self) -> Self::IntoIter {
124        UpdateSeq {
125            base: self.base.into_iter(),
126            update_op: self.update_op,
127        }
128    }
129
130    fn min_len(&self) -> usize {
131        self.base.min_len()
132    }
133    fn max_len(&self) -> usize {
134        self.base.max_len()
135    }
136
137    fn split_at(self, index: usize) -> (Self, Self) {
138        let (left, right) = self.base.split_at(index);
139        (
140            UpdateProducer {
141                base: left,
142                update_op: self.update_op,
143            },
144            UpdateProducer {
145                base: right,
146                update_op: self.update_op,
147            },
148        )
149    }
150
151    fn fold_with<G>(self, folder: G) -> G
152    where
153        G: Folder<Self::Item>,
154    {
155        let folder1 = UpdateFolder {
156            base: folder,
157            update_op: self.update_op,
158        };
159        self.base.fold_with(folder1).base
160    }
161}
162
163/// ////////////////////////////////////////////////////////////////////////
164/// Consumer implementation
165
166struct UpdateConsumer<'f, C, F> {
167    base: C,
168    update_op: &'f F,
169}
170
171impl<'f, C, F> UpdateConsumer<'f, C, F> {
172    fn new(base: C, update_op: &'f F) -> Self {
173        UpdateConsumer { base, update_op }
174    }
175}
176
177impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
178where
179    C: Consumer<T>,
180    F: Fn(&mut T) + Send + Sync,
181{
182    type Folder = UpdateFolder<'f, C::Folder, F>;
183    type Reducer = C::Reducer;
184    type Result = C::Result;
185
186    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
187        let (left, right, reducer) = self.base.split_at(index);
188        (
189            UpdateConsumer::new(left, self.update_op),
190            UpdateConsumer::new(right, self.update_op),
191            reducer,
192        )
193    }
194
195    fn into_folder(self) -> Self::Folder {
196        UpdateFolder {
197            base: self.base.into_folder(),
198            update_op: self.update_op,
199        }
200    }
201
202    fn full(&self) -> bool {
203        self.base.full()
204    }
205}
206
207impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
208where
209    C: UnindexedConsumer<T>,
210    F: Fn(&mut T) + Send + Sync,
211{
212    fn split_off_left(&self) -> Self {
213        UpdateConsumer::new(self.base.split_off_left(), self.update_op)
214    }
215
216    fn to_reducer(&self) -> Self::Reducer {
217        self.base.to_reducer()
218    }
219}
220
221struct UpdateFolder<'f, C, F> {
222    base: C,
223    update_op: &'f F,
224}
225
226fn apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T {
227    move |mut item| {
228        update_op(&mut item);
229        item
230    }
231}
232
233impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
234where
235    C: Folder<T>,
236    F: Fn(&mut T),
237{
238    type Result = C::Result;
239
240    fn consume(self, mut item: T) -> Self {
241        (self.update_op)(&mut item);
242
243        UpdateFolder {
244            base: self.base.consume(item),
245            update_op: self.update_op,
246        }
247    }
248
249    fn consume_iter<I>(mut self, iter: I) -> Self
250    where
251        I: IntoIterator<Item = T>,
252    {
253        let update_op = self.update_op;
254        self.base = self
255            .base
256            .consume_iter(iter.into_iter().map(apply(update_op)));
257        self
258    }
259
260    fn complete(self) -> C::Result {
261        self.base.complete()
262    }
263
264    fn full(&self) -> bool {
265        self.base.full()
266    }
267}
268
269/// Standard Update adaptor, based on `itertools::adaptors::Update`
270#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
271#[derive(Debug, Clone)]
272struct UpdateSeq<I, F> {
273    base: I,
274    update_op: F,
275}
276
277impl<I, F> Iterator for UpdateSeq<I, F>
278where
279    I: Iterator,
280    F: Fn(&mut I::Item),
281{
282    type Item = I::Item;
283
284    fn next(&mut self) -> Option<Self::Item> {
285        let mut v = self.base.next()?;
286        (self.update_op)(&mut v);
287        Some(v)
288    }
289
290    fn size_hint(&self) -> (usize, Option<usize>) {
291        self.base.size_hint()
292    }
293
294    fn fold<Acc, G>(self, init: Acc, g: G) -> Acc
295    where
296        G: FnMut(Acc, Self::Item) -> Acc,
297    {
298        self.base.map(apply(self.update_op)).fold(init, g)
299    }
300
301    // if possible, re-use inner iterator specializations in collect
302    fn collect<C>(self) -> C
303    where
304        C: ::std::iter::FromIterator<Self::Item>,
305    {
306        self.base.map(apply(self.update_op)).collect()
307    }
308}
309
310impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
311where
312    I: ExactSizeIterator,
313    F: Fn(&mut I::Item),
314{
315}
316
317impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
318where
319    I: DoubleEndedIterator,
320    F: Fn(&mut I::Item),
321{
322    fn next_back(&mut self) -> Option<Self::Item> {
323        let mut v = self.base.next_back()?;
324        (self.update_op)(&mut v);
325        Some(v)
326    }
327}