rayon/iter/
extend.rs

1use super::noop::NoopConsumer;
2use super::plumbing::{Consumer, Folder, Reducer, UnindexedConsumer};
3use super::{IntoParallelIterator, ParallelExtend, ParallelIterator};
4
5use either::Either;
6use std::borrow::Cow;
7use std::collections::LinkedList;
8use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
9use std::collections::{BinaryHeap, VecDeque};
10use std::ffi::{OsStr, OsString};
11use std::hash::{BuildHasher, Hash};
12
13/// Performs a generic `par_extend` by collecting to a `LinkedList<Vec<_>>` in
14/// parallel, then extending the collection sequentially.
15macro_rules! extend {
16    ($self:ident, $par_iter:ident) => {
17        extend!($self <- fast_collect($par_iter))
18    };
19    ($self:ident <- $vecs:expr) => {
20        match $vecs {
21            Either::Left(vec) => $self.extend(vec),
22            Either::Right(list) => {
23                for vec in list {
24                    $self.extend(vec);
25                }
26            }
27        }
28    };
29}
30macro_rules! extend_reserved {
31    ($self:ident, $par_iter:ident, $len:ident) => {
32        let vecs = fast_collect($par_iter);
33        $self.reserve($len(&vecs));
34        extend!($self <- vecs)
35    };
36    ($self:ident, $par_iter:ident) => {
37        extend_reserved!($self, $par_iter, len)
38    };
39}
40
41/// Computes the total length of a `fast_collect` result.
42fn len<T>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
43    match vecs {
44        Either::Left(vec) => vec.len(),
45        Either::Right(list) => list.iter().map(Vec::len).sum(),
46    }
47}
48
49/// Computes the total string length of a `fast_collect` result.
50fn string_len<T: AsRef<str>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
51    let strs = match vecs {
52        Either::Left(vec) => Either::Left(vec.iter()),
53        Either::Right(list) => Either::Right(list.iter().flatten()),
54    };
55    strs.map(AsRef::as_ref).map(str::len).sum()
56}
57
58/// Computes the total OS-string length of a `fast_collect` result.
59fn osstring_len<T: AsRef<OsStr>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
60    let osstrs = match vecs {
61        Either::Left(vec) => Either::Left(vec.iter()),
62        Either::Right(list) => Either::Right(list.iter().flatten()),
63    };
64    osstrs.map(AsRef::as_ref).map(OsStr::len).sum()
65}
66
67pub(super) fn fast_collect<I, T>(pi: I) -> Either<Vec<T>, LinkedList<Vec<T>>>
68where
69    I: IntoParallelIterator<Item = T>,
70    T: Send,
71{
72    let par_iter = pi.into_par_iter();
73    match par_iter.opt_len() {
74        Some(len) => {
75            // Pseudo-specialization. See impl of ParallelExtend for Vec for more details.
76            let mut vec = Vec::new();
77            super::collect::special_extend(par_iter, len, &mut vec);
78            Either::Left(vec)
79        }
80        None => Either::Right(par_iter.drive_unindexed(ListVecConsumer)),
81    }
82}
83
84struct ListVecConsumer;
85
86struct ListVecFolder<T> {
87    vec: Vec<T>,
88}
89
90impl<T: Send> Consumer<T> for ListVecConsumer {
91    type Folder = ListVecFolder<T>;
92    type Reducer = ListReducer;
93    type Result = LinkedList<Vec<T>>;
94
95    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
96        (Self, Self, ListReducer)
97    }
98
99    fn into_folder(self) -> Self::Folder {
100        ListVecFolder { vec: Vec::new() }
101    }
102
103    fn full(&self) -> bool {
104        false
105    }
106}
107
108impl<T: Send> UnindexedConsumer<T> for ListVecConsumer {
109    fn split_off_left(&self) -> Self {
110        Self
111    }
112
113    fn to_reducer(&self) -> Self::Reducer {
114        ListReducer
115    }
116}
117
118impl<T> Folder<T> for ListVecFolder<T> {
119    type Result = LinkedList<Vec<T>>;
120
121    fn consume(mut self, item: T) -> Self {
122        self.vec.push(item);
123        self
124    }
125
126    fn consume_iter<I>(mut self, iter: I) -> Self
127    where
128        I: IntoIterator<Item = T>,
129    {
130        self.vec.extend(iter);
131        self
132    }
133
134    fn complete(self) -> Self::Result {
135        let mut list = LinkedList::new();
136        if !self.vec.is_empty() {
137            list.push_back(self.vec);
138        }
139        list
140    }
141
142    fn full(&self) -> bool {
143        false
144    }
145}
146
147/// Extends a binary heap with items from a parallel iterator.
148impl<T> ParallelExtend<T> for BinaryHeap<T>
149where
150    T: Ord + Send,
151{
152    fn par_extend<I>(&mut self, par_iter: I)
153    where
154        I: IntoParallelIterator<Item = T>,
155    {
156        extend_reserved!(self, par_iter);
157    }
158}
159
160/// Extends a binary heap with copied items from a parallel iterator.
161impl<'a, T> ParallelExtend<&'a T> for BinaryHeap<T>
162where
163    T: 'a + Copy + Ord + Send + Sync,
164{
165    fn par_extend<I>(&mut self, par_iter: I)
166    where
167        I: IntoParallelIterator<Item = &'a T>,
168    {
169        extend_reserved!(self, par_iter);
170    }
171}
172
173/// Extends a B-tree map with items from a parallel iterator.
174impl<K, V> ParallelExtend<(K, V)> for BTreeMap<K, V>
175where
176    K: Ord + Send,
177    V: Send,
178{
179    fn par_extend<I>(&mut self, par_iter: I)
180    where
181        I: IntoParallelIterator<Item = (K, V)>,
182    {
183        extend!(self, par_iter);
184    }
185}
186
187/// Extends a B-tree map with copied items from a parallel iterator.
188impl<'a, K: 'a, V: 'a> ParallelExtend<(&'a K, &'a V)> for BTreeMap<K, V>
189where
190    K: Copy + Ord + Send + Sync,
191    V: Copy + Send + Sync,
192{
193    fn par_extend<I>(&mut self, par_iter: I)
194    where
195        I: IntoParallelIterator<Item = (&'a K, &'a V)>,
196    {
197        extend!(self, par_iter);
198    }
199}
200
201/// Extends a B-tree set with items from a parallel iterator.
202impl<T> ParallelExtend<T> for BTreeSet<T>
203where
204    T: Ord + Send,
205{
206    fn par_extend<I>(&mut self, par_iter: I)
207    where
208        I: IntoParallelIterator<Item = T>,
209    {
210        extend!(self, par_iter);
211    }
212}
213
214/// Extends a B-tree set with copied items from a parallel iterator.
215impl<'a, T> ParallelExtend<&'a T> for BTreeSet<T>
216where
217    T: 'a + Copy + Ord + Send + Sync,
218{
219    fn par_extend<I>(&mut self, par_iter: I)
220    where
221        I: IntoParallelIterator<Item = &'a T>,
222    {
223        extend!(self, par_iter);
224    }
225}
226
227/// Extends a hash map with items from a parallel iterator.
228impl<K, V, S> ParallelExtend<(K, V)> for HashMap<K, V, S>
229where
230    K: Eq + Hash + Send,
231    V: Send,
232    S: BuildHasher + Send,
233{
234    fn par_extend<I>(&mut self, par_iter: I)
235    where
236        I: IntoParallelIterator<Item = (K, V)>,
237    {
238        // See the map_collect benchmarks in rayon-demo for different strategies.
239        extend_reserved!(self, par_iter);
240    }
241}
242
243/// Extends a hash map with copied items from a parallel iterator.
244impl<'a, K: 'a, V: 'a, S> ParallelExtend<(&'a K, &'a V)> for HashMap<K, V, S>
245where
246    K: Copy + Eq + Hash + Send + Sync,
247    V: Copy + Send + Sync,
248    S: BuildHasher + Send,
249{
250    fn par_extend<I>(&mut self, par_iter: I)
251    where
252        I: IntoParallelIterator<Item = (&'a K, &'a V)>,
253    {
254        extend_reserved!(self, par_iter);
255    }
256}
257
258/// Extends a hash set with items from a parallel iterator.
259impl<T, S> ParallelExtend<T> for HashSet<T, S>
260where
261    T: Eq + Hash + Send,
262    S: BuildHasher + Send,
263{
264    fn par_extend<I>(&mut self, par_iter: I)
265    where
266        I: IntoParallelIterator<Item = T>,
267    {
268        extend_reserved!(self, par_iter);
269    }
270}
271
272/// Extends a hash set with copied items from a parallel iterator.
273impl<'a, T, S> ParallelExtend<&'a T> for HashSet<T, S>
274where
275    T: 'a + Copy + Eq + Hash + Send + Sync,
276    S: BuildHasher + Send,
277{
278    fn par_extend<I>(&mut self, par_iter: I)
279    where
280        I: IntoParallelIterator<Item = &'a T>,
281    {
282        extend_reserved!(self, par_iter);
283    }
284}
285
286/// Extends a linked list with items from a parallel iterator.
287impl<T> ParallelExtend<T> for LinkedList<T>
288where
289    T: Send,
290{
291    fn par_extend<I>(&mut self, par_iter: I)
292    where
293        I: IntoParallelIterator<Item = T>,
294    {
295        let mut list = par_iter.into_par_iter().drive_unindexed(ListConsumer);
296        self.append(&mut list);
297    }
298}
299
300/// Extends a linked list with copied items from a parallel iterator.
301impl<'a, T> ParallelExtend<&'a T> for LinkedList<T>
302where
303    T: 'a + Copy + Send + Sync,
304{
305    fn par_extend<I>(&mut self, par_iter: I)
306    where
307        I: IntoParallelIterator<Item = &'a T>,
308    {
309        self.par_extend(par_iter.into_par_iter().copied())
310    }
311}
312
313struct ListConsumer;
314
315struct ListFolder<T> {
316    list: LinkedList<T>,
317}
318
319struct ListReducer;
320
321impl<T: Send> Consumer<T> for ListConsumer {
322    type Folder = ListFolder<T>;
323    type Reducer = ListReducer;
324    type Result = LinkedList<T>;
325
326    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
327        (Self, Self, ListReducer)
328    }
329
330    fn into_folder(self) -> Self::Folder {
331        ListFolder {
332            list: LinkedList::new(),
333        }
334    }
335
336    fn full(&self) -> bool {
337        false
338    }
339}
340
341impl<T: Send> UnindexedConsumer<T> for ListConsumer {
342    fn split_off_left(&self) -> Self {
343        Self
344    }
345
346    fn to_reducer(&self) -> Self::Reducer {
347        ListReducer
348    }
349}
350
351impl<T> Folder<T> for ListFolder<T> {
352    type Result = LinkedList<T>;
353
354    fn consume(mut self, item: T) -> Self {
355        self.list.push_back(item);
356        self
357    }
358
359    fn consume_iter<I>(mut self, iter: I) -> Self
360    where
361        I: IntoIterator<Item = T>,
362    {
363        self.list.extend(iter);
364        self
365    }
366
367    fn complete(self) -> Self::Result {
368        self.list
369    }
370
371    fn full(&self) -> bool {
372        false
373    }
374}
375
376impl<T> Reducer<LinkedList<T>> for ListReducer {
377    fn reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T> {
378        left.append(&mut right);
379        left
380    }
381}
382
383/// Extends an OS-string with string slices from a parallel iterator.
384impl<'a> ParallelExtend<&'a OsStr> for OsString {
385    fn par_extend<I>(&mut self, par_iter: I)
386    where
387        I: IntoParallelIterator<Item = &'a OsStr>,
388    {
389        extend_reserved!(self, par_iter, osstring_len);
390    }
391}
392
393/// Extends an OS-string with strings from a parallel iterator.
394impl ParallelExtend<OsString> for OsString {
395    fn par_extend<I>(&mut self, par_iter: I)
396    where
397        I: IntoParallelIterator<Item = OsString>,
398    {
399        extend_reserved!(self, par_iter, osstring_len);
400    }
401}
402
403/// Extends an OS-string with string slices from a parallel iterator.
404impl<'a> ParallelExtend<Cow<'a, OsStr>> for OsString {
405    fn par_extend<I>(&mut self, par_iter: I)
406    where
407        I: IntoParallelIterator<Item = Cow<'a, OsStr>>,
408    {
409        extend_reserved!(self, par_iter, osstring_len);
410    }
411}
412
413/// Extends a string with characters from a parallel iterator.
414impl ParallelExtend<char> for String {
415    fn par_extend<I>(&mut self, par_iter: I)
416    where
417        I: IntoParallelIterator<Item = char>,
418    {
419        // This is like `extend`, but `Vec<char>` is less efficient to deal
420        // with than `String`, so instead collect to `LinkedList<String>`.
421        let list = par_iter.into_par_iter().drive_unindexed(ListStringConsumer);
422        self.reserve(list.iter().map(String::len).sum());
423        self.extend(list);
424    }
425}
426
427/// Extends a string with copied characters from a parallel iterator.
428impl<'a> ParallelExtend<&'a char> for String {
429    fn par_extend<I>(&mut self, par_iter: I)
430    where
431        I: IntoParallelIterator<Item = &'a char>,
432    {
433        self.par_extend(par_iter.into_par_iter().copied())
434    }
435}
436
437struct ListStringConsumer;
438
439struct ListStringFolder {
440    string: String,
441}
442
443impl Consumer<char> for ListStringConsumer {
444    type Folder = ListStringFolder;
445    type Reducer = ListReducer;
446    type Result = LinkedList<String>;
447
448    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
449        (Self, Self, ListReducer)
450    }
451
452    fn into_folder(self) -> Self::Folder {
453        ListStringFolder {
454            string: String::new(),
455        }
456    }
457
458    fn full(&self) -> bool {
459        false
460    }
461}
462
463impl UnindexedConsumer<char> for ListStringConsumer {
464    fn split_off_left(&self) -> Self {
465        Self
466    }
467
468    fn to_reducer(&self) -> Self::Reducer {
469        ListReducer
470    }
471}
472
473impl Folder<char> for ListStringFolder {
474    type Result = LinkedList<String>;
475
476    fn consume(mut self, item: char) -> Self {
477        self.string.push(item);
478        self
479    }
480
481    fn consume_iter<I>(mut self, iter: I) -> Self
482    where
483        I: IntoIterator<Item = char>,
484    {
485        self.string.extend(iter);
486        self
487    }
488
489    fn complete(self) -> Self::Result {
490        let mut list = LinkedList::new();
491        if !self.string.is_empty() {
492            list.push_back(self.string);
493        }
494        list
495    }
496
497    fn full(&self) -> bool {
498        false
499    }
500}
501
502/// Extends a string with string slices from a parallel iterator.
503impl<'a> ParallelExtend<&'a str> for String {
504    fn par_extend<I>(&mut self, par_iter: I)
505    where
506        I: IntoParallelIterator<Item = &'a str>,
507    {
508        extend_reserved!(self, par_iter, string_len);
509    }
510}
511
512/// Extends a string with strings from a parallel iterator.
513impl ParallelExtend<String> for String {
514    fn par_extend<I>(&mut self, par_iter: I)
515    where
516        I: IntoParallelIterator<Item = String>,
517    {
518        extend_reserved!(self, par_iter, string_len);
519    }
520}
521
522/// Extends a string with boxed strings from a parallel iterator.
523impl ParallelExtend<Box<str>> for String {
524    fn par_extend<I>(&mut self, par_iter: I)
525    where
526        I: IntoParallelIterator<Item = Box<str>>,
527    {
528        extend_reserved!(self, par_iter, string_len);
529    }
530}
531
532/// Extends a string with string slices from a parallel iterator.
533impl<'a> ParallelExtend<Cow<'a, str>> for String {
534    fn par_extend<I>(&mut self, par_iter: I)
535    where
536        I: IntoParallelIterator<Item = Cow<'a, str>>,
537    {
538        extend_reserved!(self, par_iter, string_len);
539    }
540}
541
542/// Extends a deque with items from a parallel iterator.
543impl<T> ParallelExtend<T> for VecDeque<T>
544where
545    T: Send,
546{
547    fn par_extend<I>(&mut self, par_iter: I)
548    where
549        I: IntoParallelIterator<Item = T>,
550    {
551        extend_reserved!(self, par_iter);
552    }
553}
554
555/// Extends a deque with copied items from a parallel iterator.
556impl<'a, T> ParallelExtend<&'a T> for VecDeque<T>
557where
558    T: 'a + Copy + Send + Sync,
559{
560    fn par_extend<I>(&mut self, par_iter: I)
561    where
562        I: IntoParallelIterator<Item = &'a T>,
563    {
564        extend_reserved!(self, par_iter);
565    }
566}
567
568/// Extends a vector with items from a parallel iterator.
569impl<T> ParallelExtend<T> for Vec<T>
570where
571    T: Send,
572{
573    fn par_extend<I>(&mut self, par_iter: I)
574    where
575        I: IntoParallelIterator<Item = T>,
576    {
577        // See the vec_collect benchmarks in rayon-demo for different strategies.
578        let par_iter = par_iter.into_par_iter();
579        match par_iter.opt_len() {
580            Some(len) => {
581                // When Rust gets specialization, we can get here for indexed iterators
582                // without relying on `opt_len`.  Until then, `special_extend()` fakes
583                // an unindexed mode on the promise that `opt_len()` is accurate.
584                super::collect::special_extend(par_iter, len, self);
585            }
586            None => {
587                // This works like `extend`, but `Vec::append` is more efficient.
588                let list = par_iter.drive_unindexed(ListVecConsumer);
589                self.reserve(list.iter().map(Vec::len).sum());
590                for mut other in list {
591                    self.append(&mut other);
592                }
593            }
594        }
595    }
596}
597
598/// Extends a vector with copied items from a parallel iterator.
599impl<'a, T> ParallelExtend<&'a T> for Vec<T>
600where
601    T: 'a + Copy + Send + Sync,
602{
603    fn par_extend<I>(&mut self, par_iter: I)
604    where
605        I: IntoParallelIterator<Item = &'a T>,
606    {
607        self.par_extend(par_iter.into_par_iter().copied())
608    }
609}
610
611/// Collapses all unit items from a parallel iterator into one.
612impl ParallelExtend<()> for () {
613    fn par_extend<I>(&mut self, par_iter: I)
614    where
615        I: IntoParallelIterator<Item = ()>,
616    {
617        par_iter.into_par_iter().drive_unindexed(NoopConsumer)
618    }
619}