rayon/iter/
flatten.rs

1use super::plumbing::*;
2use super::*;
3
4/// `Flatten` turns each element to a parallel iterator, then flattens these iterators
5/// together. This struct is created by the [`flatten()`] method on [`ParallelIterator`].
6///
7/// [`flatten()`]: trait.ParallelIterator.html#method.flatten
8/// [`ParallelIterator`]: trait.ParallelIterator.html
9#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
10#[derive(Debug, Clone)]
11pub struct Flatten<I: ParallelIterator> {
12    base: I,
13}
14
15impl<I> Flatten<I>
16where
17    I: ParallelIterator,
18    I::Item: IntoParallelIterator,
19{
20    /// Creates a new `Flatten` iterator.
21    pub(super) fn new(base: I) -> Self {
22        Flatten { base }
23    }
24}
25
26impl<I> ParallelIterator for Flatten<I>
27where
28    I: ParallelIterator,
29    I::Item: IntoParallelIterator,
30{
31    type Item = <I::Item as IntoParallelIterator>::Item;
32
33    fn drive_unindexed<C>(self, consumer: C) -> C::Result
34    where
35        C: UnindexedConsumer<Self::Item>,
36    {
37        let consumer = FlattenConsumer::new(consumer);
38        self.base.drive_unindexed(consumer)
39    }
40}
41
42/// ////////////////////////////////////////////////////////////////////////
43/// Consumer implementation
44
45struct FlattenConsumer<C> {
46    base: C,
47}
48
49impl<C> FlattenConsumer<C> {
50    fn new(base: C) -> Self {
51        FlattenConsumer { base }
52    }
53}
54
55impl<T, C> Consumer<T> for FlattenConsumer<C>
56where
57    C: UnindexedConsumer<T::Item>,
58    T: IntoParallelIterator,
59{
60    type Folder = FlattenFolder<C, C::Result>;
61    type Reducer = C::Reducer;
62    type Result = C::Result;
63
64    fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
65        let (left, right, reducer) = self.base.split_at(index);
66        (
67            FlattenConsumer::new(left),
68            FlattenConsumer::new(right),
69            reducer,
70        )
71    }
72
73    fn into_folder(self) -> Self::Folder {
74        FlattenFolder {
75            base: self.base,
76            previous: None,
77        }
78    }
79
80    fn full(&self) -> bool {
81        self.base.full()
82    }
83}
84
85impl<T, C> UnindexedConsumer<T> for FlattenConsumer<C>
86where
87    C: UnindexedConsumer<T::Item>,
88    T: IntoParallelIterator,
89{
90    fn split_off_left(&self) -> Self {
91        FlattenConsumer::new(self.base.split_off_left())
92    }
93
94    fn to_reducer(&self) -> Self::Reducer {
95        self.base.to_reducer()
96    }
97}
98
99struct FlattenFolder<C, R> {
100    base: C,
101    previous: Option<R>,
102}
103
104impl<T, C> Folder<T> for FlattenFolder<C, C::Result>
105where
106    C: UnindexedConsumer<T::Item>,
107    T: IntoParallelIterator,
108{
109    type Result = C::Result;
110
111    fn consume(self, item: T) -> Self {
112        let par_iter = item.into_par_iter();
113        let consumer = self.base.split_off_left();
114        let result = par_iter.drive_unindexed(consumer);
115
116        let previous = match self.previous {
117            None => Some(result),
118            Some(previous) => {
119                let reducer = self.base.to_reducer();
120                Some(reducer.reduce(previous, result))
121            }
122        };
123
124        FlattenFolder {
125            base: self.base,
126            previous,
127        }
128    }
129
130    fn complete(self) -> Self::Result {
131        match self.previous {
132            Some(previous) => previous,
133            None => self.base.into_folder().complete(),
134        }
135    }
136
137    fn full(&self) -> bool {
138        self.base.full()
139    }
140}