rayon/iter/
par_bridge.rs

1#[cfg(not(feature = "web_spin_lock"))]
2use std::sync::Mutex;
3
4#[cfg(feature = "web_spin_lock")]
5use wasm_sync::Mutex;
6
7use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8
9use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
10use crate::iter::ParallelIterator;
11use crate::{current_num_threads, current_thread_index};
12
13/// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
14///
15/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
16/// across the Rayon thread pool. This has the advantage of being able to parallelize just about
17/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
18/// `par_iter` instead. However, it can still be useful for iterators that are difficult to
19/// parallelize by other means, like channels or file or network I/O.
20///
21/// Iterator items are pulled by `next()` one at a time, synchronized from each thread that is
22/// ready for work, so this may become a bottleneck if the serial iterator can't keep up with the
23/// parallel demand. The items are not buffered by `IterBridge`, so it's fine to use this with
24/// large or even unbounded iterators.
25///
26/// The resulting iterator is not guaranteed to keep the order of the original iterator.
27///
28/// # Examples
29///
30/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
31/// use any of the `ParallelIterator` methods:
32///
33/// ```
34/// use rayon::iter::ParallelBridge;
35/// use rayon::prelude::ParallelIterator;
36/// use std::sync::mpsc::channel;
37///
38/// let rx = {
39///     let (tx, rx) = channel();
40///
41///     tx.send("one!");
42///     tx.send("two!");
43///     tx.send("three!");
44///
45///     rx
46/// };
47///
48/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
49/// output.sort_unstable();
50///
51/// assert_eq!(&*output, &["one!", "three!", "two!"]);
52/// ```
53pub trait ParallelBridge: Sized {
54    /// Creates a bridge from this type to a `ParallelIterator`.
55    fn par_bridge(self) -> IterBridge<Self>;
56}
57
58impl<T: Iterator + Send> ParallelBridge for T
59where
60    T::Item: Send,
61{
62    fn par_bridge(self) -> IterBridge<Self> {
63        IterBridge { iter: self }
64    }
65}
66
67/// `IterBridge` is a parallel iterator that wraps a sequential iterator.
68///
69/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
70/// [`ParallelBridge`] documentation for details.
71///
72/// [`ParallelBridge`]: trait.ParallelBridge.html
73#[derive(Debug, Clone)]
74pub struct IterBridge<Iter> {
75    iter: Iter,
76}
77
78impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
79where
80    Iter::Item: Send,
81{
82    type Item = Iter::Item;
83
84    fn drive_unindexed<C>(self, consumer: C) -> C::Result
85    where
86        C: UnindexedConsumer<Self::Item>,
87    {
88        let num_threads = current_num_threads();
89        let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect();
90
91        bridge_unindexed(
92            &IterParallelProducer {
93                split_count: AtomicUsize::new(num_threads),
94                iter: Mutex::new(self.iter.fuse()),
95                threads_started: &threads_started,
96            },
97            consumer,
98        )
99    }
100}
101
102struct IterParallelProducer<'a, Iter> {
103    split_count: AtomicUsize,
104    iter: Mutex<std::iter::Fuse<Iter>>,
105    threads_started: &'a [AtomicBool],
106}
107
108impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> {
109    type Item = Iter::Item;
110
111    fn split(self) -> (Self, Option<Self>) {
112        // Check if the iterator is exhausted
113        let update = self
114            .split_count
115            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |c| c.checked_sub(1));
116        (self, update.is_ok().then_some(self))
117    }
118
119    fn fold_with<F>(self, mut folder: F) -> F
120    where
121        F: Folder<Self::Item>,
122    {
123        // Guard against work-stealing-induced recursion, in case `Iter::next()`
124        // calls rayon internally, so we don't deadlock our mutex. We might also
125        // be recursing via `folder` methods, which doesn't present a mutex hazard,
126        // but it's lower overhead for us to just check this once, rather than
127        // updating additional shared state on every mutex lock/unlock.
128        // (If this isn't a rayon thread, then there's no work-stealing anyway...)
129        if let Some(i) = current_thread_index() {
130            // Note: If the number of threads in the pool ever grows dynamically, then
131            // we'll end up sharing flags and may falsely detect recursion -- that's
132            // still fine for overall correctness, just not optimal for parallelism.
133            let thread_started = &self.threads_started[i % self.threads_started.len()];
134            if thread_started.swap(true, Ordering::Relaxed) {
135                // We can't make progress with a nested mutex, so just return and let
136                // the outermost loop continue with the rest of the iterator items.
137                return folder;
138            }
139        }
140
141        loop {
142            if let Ok(mut iter) = self.iter.lock() {
143                if let Some(it) = iter.next() {
144                    drop(iter);
145                    folder = folder.consume(it);
146                    if folder.full() {
147                        return folder;
148                    }
149                } else {
150                    return folder;
151                }
152            } else {
153                // any panics from other threads will have been caught by the pool,
154                // and will be re-thrown when joined - just exit
155                return folder;
156            }
157        }
158    }
159}