1use crate::iter::plumbing::*;
9use crate::iter::*;
10use std::sync::atomic::{AtomicBool, Ordering};
11
12#[derive(Debug, Clone)]
22pub struct IntoIter<T: Send> {
23 opt: Option<T>,
24}
25
26impl<T: Send> IntoParallelIterator for Option<T> {
27 type Item = T;
28 type Iter = IntoIter<T>;
29
30 fn into_par_iter(self) -> Self::Iter {
31 IntoIter { opt: self }
32 }
33}
34
35impl<T: Send> ParallelIterator for IntoIter<T> {
36 type Item = T;
37
38 fn drive_unindexed<C>(self, consumer: C) -> C::Result
39 where
40 C: UnindexedConsumer<Self::Item>,
41 {
42 self.drive(consumer)
43 }
44
45 fn opt_len(&self) -> Option<usize> {
46 Some(self.len())
47 }
48}
49
50impl<T: Send> IndexedParallelIterator for IntoIter<T> {
51 fn drive<C>(self, consumer: C) -> C::Result
52 where
53 C: Consumer<Self::Item>,
54 {
55 let mut folder = consumer.into_folder();
56 if let Some(item) = self.opt {
57 folder = folder.consume(item);
58 }
59 folder.complete()
60 }
61
62 fn len(&self) -> usize {
63 match self.opt {
64 Some(_) => 1,
65 None => 0,
66 }
67 }
68
69 fn with_producer<CB>(self, callback: CB) -> CB::Output
70 where
71 CB: ProducerCallback<Self::Item>,
72 {
73 callback.callback(OptionProducer { opt: self.opt })
74 }
75}
76
77#[derive(Debug)]
87pub struct Iter<'a, T: Sync> {
88 inner: IntoIter<&'a T>,
89}
90
91impl<'a, T: Sync> Clone for Iter<'a, T> {
92 fn clone(&self) -> Self {
93 Iter {
94 inner: self.inner.clone(),
95 }
96 }
97}
98
99impl<'a, T: Sync> IntoParallelIterator for &'a Option<T> {
100 type Item = &'a T;
101 type Iter = Iter<'a, T>;
102
103 fn into_par_iter(self) -> Self::Iter {
104 Iter {
105 inner: self.as_ref().into_par_iter(),
106 }
107 }
108}
109
110delegate_indexed_iterator! {
111 Iter<'a, T> => &'a T,
112 impl<'a, T: Sync + 'a>
113}
114
115#[derive(Debug)]
125pub struct IterMut<'a, T: Send> {
126 inner: IntoIter<&'a mut T>,
127}
128
129impl<'a, T: Send> IntoParallelIterator for &'a mut Option<T> {
130 type Item = &'a mut T;
131 type Iter = IterMut<'a, T>;
132
133 fn into_par_iter(self) -> Self::Iter {
134 IterMut {
135 inner: self.as_mut().into_par_iter(),
136 }
137 }
138}
139
140delegate_indexed_iterator! {
141 IterMut<'a, T> => &'a mut T,
142 impl<'a, T: Send + 'a>
143}
144
145struct OptionProducer<T: Send> {
147 opt: Option<T>,
148}
149
150impl<T: Send> Producer for OptionProducer<T> {
151 type Item = T;
152 type IntoIter = std::option::IntoIter<T>;
153
154 fn into_iter(self) -> Self::IntoIter {
155 self.opt.into_iter()
156 }
157
158 fn split_at(self, index: usize) -> (Self, Self) {
159 debug_assert!(index <= 1);
160 let none = OptionProducer { opt: None };
161 if index == 0 {
162 (none, self)
163 } else {
164 (self, none)
165 }
166 }
167}
168
169impl<C, T> FromParallelIterator<Option<T>> for Option<C>
174where
175 C: FromParallelIterator<T>,
176 T: Send,
177{
178 fn from_par_iter<I>(par_iter: I) -> Self
179 where
180 I: IntoParallelIterator<Item = Option<T>>,
181 {
182 fn check<T>(found_none: &AtomicBool) -> impl Fn(&Option<T>) + '_ {
183 move |item| {
184 if item.is_none() {
185 found_none.store(true, Ordering::Relaxed);
186 }
187 }
188 }
189
190 let found_none = AtomicBool::new(false);
191 let collection = par_iter
192 .into_par_iter()
193 .inspect(check(&found_none))
194 .while_some()
195 .collect();
196
197 if found_none.load(Ordering::Relaxed) {
198 None
199 } else {
200 Some(collection)
201 }
202 }
203}