rayon/iter/
take_any.rs

1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5/// `TakeAny` is an iterator that iterates over `n` elements from anywhere in `I`.
6/// This struct is created by the [`take_any()`] method on [`ParallelIterator`]
7///
8/// [`take_any()`]: trait.ParallelIterator.html#method.take_any
9/// [`ParallelIterator`]: trait.ParallelIterator.html
10#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
11#[derive(Clone, Debug)]
12pub struct TakeAny<I: ParallelIterator> {
13    base: I,
14    count: usize,
15}
16
17impl<I> TakeAny<I>
18where
19    I: ParallelIterator,
20{
21    /// Creates a new `TakeAny` iterator.
22    pub(super) fn new(base: I, count: usize) -> Self {
23        TakeAny { base, count }
24    }
25}
26
27impl<I> ParallelIterator for TakeAny<I>
28where
29    I: ParallelIterator,
30{
31    type Item = I::Item;
32
33    fn drive_unindexed<C>(self, consumer: C) -> C::Result
34    where
35        C: UnindexedConsumer<Self::Item>,
36    {
37        let consumer1 = TakeAnyConsumer {
38            base: consumer,
39            count: &AtomicUsize::new(self.count),
40        };
41        self.base.drive_unindexed(consumer1)
42    }
43}
44
45/// ////////////////////////////////////////////////////////////////////////
46/// Consumer implementation
47
48struct TakeAnyConsumer<'f, C> {
49    base: C,
50    count: &'f AtomicUsize,
51}
52
53impl<'f, T, C> Consumer<T> for TakeAnyConsumer<'f, C>
54where
55    C: Consumer<T>,
56    T: Send,
57{
58    type Folder = TakeAnyFolder<'f, C::Folder>;
59    type Reducer = C::Reducer;
60    type Result = C::Result;
61
62    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
63        let (left, right, reducer) = self.base.split_at(index);
64        (
65            TakeAnyConsumer { base: left, ..self },
66            TakeAnyConsumer {
67                base: right,
68                ..self
69            },
70            reducer,
71        )
72    }
73
74    fn into_folder(self) -> Self::Folder {
75        TakeAnyFolder {
76            base: self.base.into_folder(),
77            count: self.count,
78        }
79    }
80
81    fn full(&self) -> bool {
82        self.count.load(Ordering::Relaxed) == 0 || self.base.full()
83    }
84}
85
86impl<'f, T, C> UnindexedConsumer<T> for TakeAnyConsumer<'f, C>
87where
88    C: UnindexedConsumer<T>,
89    T: Send,
90{
91    fn split_off_left(&self) -> Self {
92        TakeAnyConsumer {
93            base: self.base.split_off_left(),
94            ..*self
95        }
96    }
97
98    fn to_reducer(&self) -> Self::Reducer {
99        self.base.to_reducer()
100    }
101}
102
103struct TakeAnyFolder<'f, C> {
104    base: C,
105    count: &'f AtomicUsize,
106}
107
108fn checked_decrement(u: &AtomicUsize) -> bool {
109    u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1))
110        .is_ok()
111}
112
113impl<'f, T, C> Folder<T> for TakeAnyFolder<'f, C>
114where
115    C: Folder<T>,
116{
117    type Result = C::Result;
118
119    fn consume(mut self, item: T) -> Self {
120        if checked_decrement(self.count) {
121            self.base = self.base.consume(item);
122        }
123        self
124    }
125
126    fn consume_iter<I>(mut self, iter: I) -> Self
127    where
128        I: IntoIterator<Item = T>,
129    {
130        self.base = self.base.consume_iter(
131            iter.into_iter()
132                .take_while(move |_| checked_decrement(self.count)),
133        );
134        self
135    }
136
137    fn complete(self) -> C::Result {
138        self.base.complete()
139    }
140
141    fn full(&self) -> bool {
142        self.count.load(Ordering::Relaxed) == 0 || self.base.full()
143    }
144}