rayon/iter/
blocks.rs

1use super::plumbing::*;
2use super::*;
3
4struct BlocksCallback<S, C> {
5    sizes: S,
6    consumer: C,
7    len: usize,
8}
9
10impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
11where
12    C: UnindexedConsumer<T>,
13    S: Iterator<Item = usize>,
14{
15    type Output = C::Result;
16
17    fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
18        let mut remaining_len = self.len;
19        let mut consumer = self.consumer;
20
21        // we need a local variable for the accumulated results
22        // we call the reducer's identity by splitting at 0
23        let (left_consumer, right_consumer, _) = consumer.split_at(0);
24        let mut leftmost_res = left_consumer.into_folder().complete();
25        consumer = right_consumer;
26
27        // now we loop on each block size
28        while remaining_len > 0 && !consumer.full() {
29            // we compute the next block's size
30            let size = self.sizes.next().unwrap_or(std::usize::MAX);
31            let capped_size = remaining_len.min(size);
32            remaining_len -= capped_size;
33
34            // split the producer
35            let (left_producer, right_producer) = producer.split_at(capped_size);
36            producer = right_producer;
37
38            // split the consumer
39            let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
40            consumer = right_consumer;
41
42            leftmost_res = consumer.to_reducer().reduce(
43                leftmost_res,
44                bridge_producer_consumer(capped_size, left_producer, left_consumer),
45            );
46        }
47        leftmost_res
48    }
49}
50
51/// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence
52/// of parallel blocks of increasing sizes (exponentially).
53///
54/// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`]
55///
56/// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks
57/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
58#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
59#[derive(Debug, Clone)]
60pub struct ExponentialBlocks<I> {
61    base: I,
62}
63
64impl<I> ExponentialBlocks<I> {
65    pub(super) fn new(base: I) -> Self {
66        Self { base }
67    }
68}
69
70impl<I> ParallelIterator for ExponentialBlocks<I>
71where
72    I: IndexedParallelIterator,
73{
74    type Item = I::Item;
75
76    fn drive_unindexed<C>(self, consumer: C) -> C::Result
77    where
78        C: UnindexedConsumer<Self::Item>,
79    {
80        let first = crate::current_num_threads();
81        let callback = BlocksCallback {
82            consumer,
83            sizes: std::iter::successors(Some(first), exponential_size),
84            len: self.base.len(),
85        };
86        self.base.with_producer(callback)
87    }
88}
89
90fn exponential_size(size: &usize) -> Option<usize> {
91    Some(size.saturating_mul(2))
92}
93
94/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence
95/// of parallel blocks of constant sizes.
96///
97/// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`]
98///
99/// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks
100/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
101#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
102#[derive(Debug, Clone)]
103pub struct UniformBlocks<I> {
104    base: I,
105    block_size: usize,
106}
107
108impl<I> UniformBlocks<I> {
109    pub(super) fn new(base: I, block_size: usize) -> Self {
110        Self { base, block_size }
111    }
112}
113
114impl<I> ParallelIterator for UniformBlocks<I>
115where
116    I: IndexedParallelIterator,
117{
118    type Item = I::Item;
119
120    fn drive_unindexed<C>(self, consumer: C) -> C::Result
121    where
122        C: UnindexedConsumer<Self::Item>,
123    {
124        let callback = BlocksCallback {
125            consumer,
126            sizes: std::iter::repeat(self.block_size),
127            len: self.base.len(),
128        };
129        self.base.with_producer(callback)
130    }
131}