rayon/iter/collect/
mod.rs

1use super::{IndexedParallelIterator, ParallelIterator};
2
3mod consumer;
4use self::consumer::CollectConsumer;
5use self::consumer::CollectResult;
6use super::unzip::unzip_indexed;
7
8mod test;
9
10/// Collects the results of the exact iterator into the specified vector.
11///
12/// This is called by `IndexedParallelIterator::collect_into_vec`.
13pub(super) fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>)
14where
15    I: IndexedParallelIterator<Item = T>,
16    T: Send,
17{
18    v.truncate(0); // clear any old data
19    let len = pi.len();
20    collect_with_consumer(v, len, |consumer| pi.drive(consumer));
21}
22
23/// Collects the results of the iterator into the specified vector.
24///
25/// Technically, this only works for `IndexedParallelIterator`, but we're faking a
26/// bit of specialization here until Rust can do that natively.  Callers are
27/// using `opt_len` to find the length before calling this, and only exact
28/// iterators will return anything but `None` there.
29///
30/// Since the type system doesn't understand that contract, we have to allow
31/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement
32/// `UnindexedConsumer`.  That implementation panics `unreachable!` in case
33/// there's a bug where we actually do try to use this unindexed.
34pub(super) fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
35where
36    I: ParallelIterator<Item = T>,
37    T: Send,
38{
39    collect_with_consumer(v, len, |consumer| pi.drive_unindexed(consumer));
40}
41
42/// Unzips the results of the exact iterator into the specified vectors.
43///
44/// This is called by `IndexedParallelIterator::unzip_into_vecs`.
45pub(super) fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>)
46where
47    I: IndexedParallelIterator<Item = (A, B)>,
48    A: Send,
49    B: Send,
50{
51    // clear any old data
52    left.truncate(0);
53    right.truncate(0);
54
55    let len = pi.len();
56    collect_with_consumer(right, len, |right_consumer| {
57        let mut right_result = None;
58        collect_with_consumer(left, len, |left_consumer| {
59            let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer);
60            right_result = Some(right_r);
61            left_r
62        });
63        right_result.unwrap()
64    });
65}
66
67/// Create a consumer on the slice of memory we are collecting into.
68///
69/// The consumer needs to be used inside the scope function, and the
70/// complete collect result passed back.
71///
72/// This method will verify the collect result, and panic if the slice
73/// was not fully written into. Otherwise, in the successful case,
74/// the vector is complete with the collected result.
75fn collect_with_consumer<T, F>(vec: &mut Vec<T>, len: usize, scope_fn: F)
76where
77    T: Send,
78    F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
79{
80    // Reserve space for `len` more elements in the vector,
81    vec.reserve(len);
82
83    // Create the consumer and run the callback for collection.
84    let result = scope_fn(CollectConsumer::appender(vec, len));
85
86    // The `CollectResult` represents a contiguous part of the slice, that has
87    // been written to. On unwind here, the `CollectResult` will be dropped. If
88    // some producers on the way did not produce enough elements, partial
89    // `CollectResult`s may have been dropped without being reduced to the final
90    // result, and we will see that as the length coming up short.
91    //
92    // Here, we assert that added length is fully initialized. This is checked
93    // by the following assert, which verifies if a complete `CollectResult`
94    // was produced; if the length is correct, it is necessarily covering the
95    // target slice. Since we know that the consumer cannot have escaped from
96    // `drive` (by parametricity, essentially), we know that any stores that
97    // will happen, have happened. Unless some code is buggy, that means we
98    // should have seen `len` total writes.
99    let actual_writes = result.len();
100    assert!(
101        actual_writes == len,
102        "expected {} total writes, but got {}",
103        len,
104        actual_writes
105    );
106
107    // Release the result's mutable borrow and "proxy ownership"
108    // of the elements, before the vector takes it over.
109    result.release_ownership();
110
111    let new_len = vec.len() + len;
112
113    unsafe {
114        vec.set_len(new_len);
115    }
116}