itertools/
groupbylazy.rs

1use alloc::vec::{self, Vec};
2use std::cell::{Cell, RefCell};
3
4/// A trait to unify `FnMut` for `ChunkBy` with the chunk key in `IntoChunks`
5trait KeyFunction<A> {
6    type Key;
7    fn call_mut(&mut self, arg: A) -> Self::Key;
8}
9
10impl<A, K, F> KeyFunction<A> for F
11where
12    F: FnMut(A) -> K + ?Sized,
13{
14    type Key = K;
15    #[inline]
16    fn call_mut(&mut self, arg: A) -> Self::Key {
17        (*self)(arg)
18    }
19}
20
21/// `ChunkIndex` acts like the grouping key function for `IntoChunks`
22#[derive(Debug, Clone)]
23struct ChunkIndex {
24    size: usize,
25    index: usize,
26    key: usize,
27}
28
29impl ChunkIndex {
30    #[inline(always)]
31    fn new(size: usize) -> Self {
32        Self {
33            size,
34            index: 0,
35            key: 0,
36        }
37    }
38}
39
40impl<A> KeyFunction<A> for ChunkIndex {
41    type Key = usize;
42    #[inline(always)]
43    fn call_mut(&mut self, _arg: A) -> Self::Key {
44        if self.index == self.size {
45            self.key += 1;
46            self.index = 0;
47        }
48        self.index += 1;
49        self.key
50    }
51}
52
53#[derive(Clone)]
54struct GroupInner<K, I, F>
55where
56    I: Iterator,
57{
58    key: F,
59    iter: I,
60    current_key: Option<K>,
61    current_elt: Option<I::Item>,
62    /// flag set if iterator is exhausted
63    done: bool,
64    /// Index of group we are currently buffering or visiting
65    top_group: usize,
66    /// Least index for which we still have elements buffered
67    oldest_buffered_group: usize,
68    /// Group index for `buffer[0]` -- the slots
69    /// `bottom_group..oldest_buffered_group` are unused and will be erased when
70    /// that range is large enough.
71    bottom_group: usize,
72    /// Buffered groups, from `bottom_group` (index 0) to `top_group`.
73    buffer: Vec<vec::IntoIter<I::Item>>,
74    /// index of last group iter that was dropped,
75    /// `usize::MAX` initially when no group was dropped
76    dropped_group: usize,
77}
78
79impl<K, I, F> GroupInner<K, I, F>
80where
81    I: Iterator,
82    F: for<'a> KeyFunction<&'a I::Item, Key = K>,
83    K: PartialEq,
84{
85    /// `client`: Index of group that requests next element
86    #[inline(always)]
87    fn step(&mut self, client: usize) -> Option<I::Item> {
88        /*
89        println!("client={}, bottom_group={}, oldest_buffered_group={}, top_group={}, buffers=[{}]",
90                 client, self.bottom_group, self.oldest_buffered_group,
91                 self.top_group,
92                 self.buffer.iter().map(|elt| elt.len()).format(", "));
93        */
94        if client < self.oldest_buffered_group {
95            None
96        } else if client < self.top_group
97            || (client == self.top_group && self.buffer.len() > self.top_group - self.bottom_group)
98        {
99            self.lookup_buffer(client)
100        } else if self.done {
101            None
102        } else if self.top_group == client {
103            self.step_current()
104        } else {
105            self.step_buffering(client)
106        }
107    }
108
109    #[inline(never)]
110    fn lookup_buffer(&mut self, client: usize) -> Option<I::Item> {
111        // if `bufidx` doesn't exist in self.buffer, it might be empty
112        let bufidx = client - self.bottom_group;
113        if client < self.oldest_buffered_group {
114            return None;
115        }
116        let elt = self.buffer.get_mut(bufidx).and_then(|queue| queue.next());
117        if elt.is_none() && client == self.oldest_buffered_group {
118            // FIXME: VecDeque is unfortunately not zero allocation when empty,
119            // so we do this job manually.
120            // `bottom_group..oldest_buffered_group` is unused, and if it's large enough, erase it.
121            self.oldest_buffered_group += 1;
122            // skip forward further empty queues too
123            while self
124                .buffer
125                .get(self.oldest_buffered_group - self.bottom_group)
126                .map_or(false, |buf| buf.len() == 0)
127            {
128                self.oldest_buffered_group += 1;
129            }
130
131            let nclear = self.oldest_buffered_group - self.bottom_group;
132            if nclear > 0 && nclear >= self.buffer.len() / 2 {
133                let mut i = 0;
134                self.buffer.retain(|buf| {
135                    i += 1;
136                    debug_assert!(buf.len() == 0 || i > nclear);
137                    i > nclear
138                });
139                self.bottom_group = self.oldest_buffered_group;
140            }
141        }
142        elt
143    }
144
145    /// Take the next element from the iterator, and set the done
146    /// flag if exhausted. Must not be called after done.
147    #[inline(always)]
148    fn next_element(&mut self) -> Option<I::Item> {
149        debug_assert!(!self.done);
150        match self.iter.next() {
151            None => {
152                self.done = true;
153                None
154            }
155            otherwise => otherwise,
156        }
157    }
158
159    #[inline(never)]
160    fn step_buffering(&mut self, client: usize) -> Option<I::Item> {
161        // requested a later group -- walk through the current group up to
162        // the requested group index, and buffer the elements (unless
163        // the group is marked as dropped).
164        // Because the `Groups` iterator is always the first to request
165        // each group index, client is the next index efter top_group.
166        debug_assert!(self.top_group + 1 == client);
167        let mut group = Vec::new();
168
169        if let Some(elt) = self.current_elt.take() {
170            if self.top_group != self.dropped_group {
171                group.push(elt);
172            }
173        }
174        let mut first_elt = None; // first element of the next group
175
176        while let Some(elt) = self.next_element() {
177            let key = self.key.call_mut(&elt);
178            match self.current_key.take() {
179                None => {}
180                Some(old_key) => {
181                    if old_key != key {
182                        self.current_key = Some(key);
183                        first_elt = Some(elt);
184                        break;
185                    }
186                }
187            }
188            self.current_key = Some(key);
189            if self.top_group != self.dropped_group {
190                group.push(elt);
191            }
192        }
193
194        if self.top_group != self.dropped_group {
195            self.push_next_group(group);
196        }
197        if first_elt.is_some() {
198            self.top_group += 1;
199            debug_assert!(self.top_group == client);
200        }
201        first_elt
202    }
203
204    fn push_next_group(&mut self, group: Vec<I::Item>) {
205        // When we add a new buffered group, fill up slots between oldest_buffered_group and top_group
206        while self.top_group - self.bottom_group > self.buffer.len() {
207            if self.buffer.is_empty() {
208                self.bottom_group += 1;
209                self.oldest_buffered_group += 1;
210            } else {
211                self.buffer.push(Vec::new().into_iter());
212            }
213        }
214        self.buffer.push(group.into_iter());
215        debug_assert!(self.top_group + 1 - self.bottom_group == self.buffer.len());
216    }
217
218    /// This is the immediate case, where we use no buffering
219    #[inline]
220    fn step_current(&mut self) -> Option<I::Item> {
221        debug_assert!(!self.done);
222        if let elt @ Some(..) = self.current_elt.take() {
223            return elt;
224        }
225        match self.next_element() {
226            None => None,
227            Some(elt) => {
228                let key = self.key.call_mut(&elt);
229                match self.current_key.take() {
230                    None => {}
231                    Some(old_key) => {
232                        if old_key != key {
233                            self.current_key = Some(key);
234                            self.current_elt = Some(elt);
235                            self.top_group += 1;
236                            return None;
237                        }
238                    }
239                }
240                self.current_key = Some(key);
241                Some(elt)
242            }
243        }
244    }
245
246    /// Request the just started groups' key.
247    ///
248    /// `client`: Index of group
249    ///
250    /// **Panics** if no group key is available.
251    fn group_key(&mut self, client: usize) -> K {
252        // This can only be called after we have just returned the first
253        // element of a group.
254        // Perform this by simply buffering one more element, grabbing the
255        // next key.
256        debug_assert!(!self.done);
257        debug_assert!(client == self.top_group);
258        debug_assert!(self.current_key.is_some());
259        debug_assert!(self.current_elt.is_none());
260        let old_key = self.current_key.take().unwrap();
261        if let Some(elt) = self.next_element() {
262            let key = self.key.call_mut(&elt);
263            if old_key != key {
264                self.top_group += 1;
265            }
266            self.current_key = Some(key);
267            self.current_elt = Some(elt);
268        }
269        old_key
270    }
271}
272
273impl<K, I, F> GroupInner<K, I, F>
274where
275    I: Iterator,
276{
277    /// Called when a group is dropped
278    fn drop_group(&mut self, client: usize) {
279        // It's only useful to track the maximal index
280        if self.dropped_group == !0 || client > self.dropped_group {
281            self.dropped_group = client;
282        }
283    }
284}
285
286#[deprecated(note = "Use `ChunkBy` instead", since = "0.13.0")]
287/// See [`ChunkBy`](crate::structs::ChunkBy).
288pub type GroupBy<K, I, F> = ChunkBy<K, I, F>;
289
290/// `ChunkBy` is the storage for the lazy grouping operation.
291///
292/// If the groups are consumed in their original order, or if each
293/// group is dropped without keeping it around, then `ChunkBy` uses
294/// no allocations. It needs allocations only if several group iterators
295/// are alive at the same time.
296///
297/// This type implements [`IntoIterator`] (it is **not** an iterator
298/// itself), because the group iterators need to borrow from this
299/// value. It should be stored in a local variable or temporary and
300/// iterated.
301///
302/// See [`.chunk_by()`](crate::Itertools::chunk_by) for more information.
303#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
304pub struct ChunkBy<K, I, F>
305where
306    I: Iterator,
307{
308    inner: RefCell<GroupInner<K, I, F>>,
309    // the group iterator's current index. Keep this in the main value
310    // so that simultaneous iterators all use the same state.
311    index: Cell<usize>,
312}
313
314/// Create a new
315pub fn new<K, J, F>(iter: J, f: F) -> ChunkBy<K, J::IntoIter, F>
316where
317    J: IntoIterator,
318    F: FnMut(&J::Item) -> K,
319{
320    ChunkBy {
321        inner: RefCell::new(GroupInner {
322            key: f,
323            iter: iter.into_iter(),
324            current_key: None,
325            current_elt: None,
326            done: false,
327            top_group: 0,
328            oldest_buffered_group: 0,
329            bottom_group: 0,
330            buffer: Vec::new(),
331            dropped_group: !0,
332        }),
333        index: Cell::new(0),
334    }
335}
336
337impl<K, I, F> ChunkBy<K, I, F>
338where
339    I: Iterator,
340{
341    /// `client`: Index of group that requests next element
342    fn step(&self, client: usize) -> Option<I::Item>
343    where
344        F: FnMut(&I::Item) -> K,
345        K: PartialEq,
346    {
347        self.inner.borrow_mut().step(client)
348    }
349
350    /// `client`: Index of group
351    fn drop_group(&self, client: usize) {
352        self.inner.borrow_mut().drop_group(client);
353    }
354}
355
356impl<'a, K, I, F> IntoIterator for &'a ChunkBy<K, I, F>
357where
358    I: Iterator,
359    I::Item: 'a,
360    F: FnMut(&I::Item) -> K,
361    K: PartialEq,
362{
363    type Item = (K, Group<'a, K, I, F>);
364    type IntoIter = Groups<'a, K, I, F>;
365
366    fn into_iter(self) -> Self::IntoIter {
367        Groups { parent: self }
368    }
369}
370
371/// An iterator that yields the Group iterators.
372///
373/// Iterator element type is `(K, Group)`:
374/// the group's key `K` and the group's iterator.
375///
376/// See [`.chunk_by()`](crate::Itertools::chunk_by) for more information.
377#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
378pub struct Groups<'a, K, I, F>
379where
380    I: Iterator + 'a,
381    I::Item: 'a,
382    K: 'a,
383    F: 'a,
384{
385    parent: &'a ChunkBy<K, I, F>,
386}
387
388impl<'a, K, I, F> Iterator for Groups<'a, K, I, F>
389where
390    I: Iterator,
391    I::Item: 'a,
392    F: FnMut(&I::Item) -> K,
393    K: PartialEq,
394{
395    type Item = (K, Group<'a, K, I, F>);
396
397    #[inline]
398    fn next(&mut self) -> Option<Self::Item> {
399        let index = self.parent.index.get();
400        self.parent.index.set(index + 1);
401        let inner = &mut *self.parent.inner.borrow_mut();
402        inner.step(index).map(|elt| {
403            let key = inner.group_key(index);
404            (
405                key,
406                Group {
407                    parent: self.parent,
408                    index,
409                    first: Some(elt),
410                },
411            )
412        })
413    }
414}
415
416/// An iterator for the elements in a single group.
417///
418/// Iterator element type is `I::Item`.
419pub struct Group<'a, K, I, F>
420where
421    I: Iterator + 'a,
422    I::Item: 'a,
423    K: 'a,
424    F: 'a,
425{
426    parent: &'a ChunkBy<K, I, F>,
427    index: usize,
428    first: Option<I::Item>,
429}
430
431impl<'a, K, I, F> Drop for Group<'a, K, I, F>
432where
433    I: Iterator,
434    I::Item: 'a,
435{
436    fn drop(&mut self) {
437        self.parent.drop_group(self.index);
438    }
439}
440
441impl<'a, K, I, F> Iterator for Group<'a, K, I, F>
442where
443    I: Iterator,
444    I::Item: 'a,
445    F: FnMut(&I::Item) -> K,
446    K: PartialEq,
447{
448    type Item = I::Item;
449    #[inline]
450    fn next(&mut self) -> Option<Self::Item> {
451        if let elt @ Some(..) = self.first.take() {
452            return elt;
453        }
454        self.parent.step(self.index)
455    }
456}
457
458///// IntoChunks /////
459
460/// Create a new
461pub fn new_chunks<J>(iter: J, size: usize) -> IntoChunks<J::IntoIter>
462where
463    J: IntoIterator,
464{
465    IntoChunks {
466        inner: RefCell::new(GroupInner {
467            key: ChunkIndex::new(size),
468            iter: iter.into_iter(),
469            current_key: None,
470            current_elt: None,
471            done: false,
472            top_group: 0,
473            oldest_buffered_group: 0,
474            bottom_group: 0,
475            buffer: Vec::new(),
476            dropped_group: !0,
477        }),
478        index: Cell::new(0),
479    }
480}
481
482/// `ChunkLazy` is the storage for a lazy chunking operation.
483///
484/// `IntoChunks` behaves just like `ChunkBy`: it is iterable, and
485/// it only buffers if several chunk iterators are alive at the same time.
486///
487/// This type implements [`IntoIterator`] (it is **not** an iterator
488/// itself), because the chunk iterators need to borrow from this
489/// value. It should be stored in a local variable or temporary and
490/// iterated.
491///
492/// Iterator element type is `Chunk`, each chunk's iterator.
493///
494/// See [`.chunks()`](crate::Itertools::chunks) for more information.
495#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
496pub struct IntoChunks<I>
497where
498    I: Iterator,
499{
500    inner: RefCell<GroupInner<usize, I, ChunkIndex>>,
501    // the chunk iterator's current index. Keep this in the main value
502    // so that simultaneous iterators all use the same state.
503    index: Cell<usize>,
504}
505
506impl<I> Clone for IntoChunks<I>
507where
508    I: Clone + Iterator,
509    I::Item: Clone,
510{
511    clone_fields!(inner, index);
512}
513
514impl<I> IntoChunks<I>
515where
516    I: Iterator,
517{
518    /// `client`: Index of chunk that requests next element
519    fn step(&self, client: usize) -> Option<I::Item> {
520        self.inner.borrow_mut().step(client)
521    }
522
523    /// `client`: Index of chunk
524    fn drop_group(&self, client: usize) {
525        self.inner.borrow_mut().drop_group(client);
526    }
527}
528
529impl<'a, I> IntoIterator for &'a IntoChunks<I>
530where
531    I: Iterator,
532    I::Item: 'a,
533{
534    type Item = Chunk<'a, I>;
535    type IntoIter = Chunks<'a, I>;
536
537    fn into_iter(self) -> Self::IntoIter {
538        Chunks { parent: self }
539    }
540}
541
542/// An iterator that yields the Chunk iterators.
543///
544/// Iterator element type is `Chunk`.
545///
546/// See [`.chunks()`](crate::Itertools::chunks) for more information.
547#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
548#[derive(Clone)]
549pub struct Chunks<'a, I>
550where
551    I: Iterator + 'a,
552    I::Item: 'a,
553{
554    parent: &'a IntoChunks<I>,
555}
556
557impl<'a, I> Iterator for Chunks<'a, I>
558where
559    I: Iterator,
560    I::Item: 'a,
561{
562    type Item = Chunk<'a, I>;
563
564    #[inline]
565    fn next(&mut self) -> Option<Self::Item> {
566        let index = self.parent.index.get();
567        self.parent.index.set(index + 1);
568        let inner = &mut *self.parent.inner.borrow_mut();
569        inner.step(index).map(|elt| Chunk {
570            parent: self.parent,
571            index,
572            first: Some(elt),
573        })
574    }
575}
576
577/// An iterator for the elements in a single chunk.
578///
579/// Iterator element type is `I::Item`.
580pub struct Chunk<'a, I>
581where
582    I: Iterator + 'a,
583    I::Item: 'a,
584{
585    parent: &'a IntoChunks<I>,
586    index: usize,
587    first: Option<I::Item>,
588}
589
590impl<'a, I> Drop for Chunk<'a, I>
591where
592    I: Iterator,
593    I::Item: 'a,
594{
595    fn drop(&mut self) {
596        self.parent.drop_group(self.index);
597    }
598}
599
600impl<'a, I> Iterator for Chunk<'a, I>
601where
602    I: Iterator,
603    I::Item: 'a,
604{
605    type Item = I::Item;
606    #[inline]
607    fn next(&mut self) -> Option<Self::Item> {
608        if let elt @ Some(..) = self.first.take() {
609            return elt;
610        }
611        self.parent.step(self.index)
612    }
613}