monoio/io/stream/
stream_ext.rs

1use std::future::Future;
2
3use super::{assert_stream, Stream};
4
5/// Stream extensions.
6pub trait StreamExt: Stream {
7    /// Maps a stream to a stream of its items.
8    fn map<T, F>(self, f: F) -> Map<Self, F>
9    where
10        F: FnMut(Self::Item) -> T,
11        Self: Sized,
12    {
13        assert_stream::<T, _>(Map::new(self, f))
14    }
15
16    /// Computes from this stream's items new items of a different type using
17    /// an asynchronous closure.
18    fn then<Fut, F>(self, f: F) -> Then<Self, F>
19    where
20        F: FnMut(Self::Item) -> Fut,
21        Fut: Future,
22        Self: Sized,
23    {
24        assert_stream::<Fut::Output, _>(Then::new(self, f))
25    }
26
27    /// Runs this stream to completion, executing the provided asynchronous
28    /// closure for each element on the stream.
29    fn for_each<Fut, F>(mut self, mut f: F) -> impl Future<Output = ()>
30    where
31        F: FnMut(Self::Item) -> Fut,
32        Fut: Future<Output = ()>,
33        Self: Sized,
34    {
35        async move {
36            while let Some(item) = self.next().await {
37                (f)(item).await;
38            }
39        }
40    }
41}
42
43impl<T> StreamExt for T where T: Stream {}
44
45#[must_use = "streams do nothing unless polled"]
46pub struct Map<St, F> {
47    stream: St,
48    f: F,
49}
50
51impl<St, F> Map<St, F> {
52    pub(crate) fn new(stream: St, f: F) -> Self {
53        Self { stream, f }
54    }
55}
56
57impl<St, F, Item> Stream for Map<St, F>
58where
59    St: Stream,
60    F: FnMut(St::Item) -> Item,
61{
62    type Item = Item;
63
64    async fn next(&mut self) -> Option<Self::Item> {
65        self.stream.next().await.map(&mut self.f)
66    }
67}
68
69#[must_use = "streams do nothing unless polled"]
70pub struct Then<St, F> {
71    stream: St,
72    f: F,
73}
74
75impl<St, F> Then<St, F>
76where
77    St: Stream,
78{
79    pub(super) fn new(stream: St, f: F) -> Self {
80        Self { stream, f }
81    }
82}
83
84impl<St, Fut, F> Stream for Then<St, F>
85where
86    St: Stream,
87    F: FnMut(St::Item) -> Fut,
88    Fut: Future,
89{
90    type Item = Fut::Output;
91
92    async fn next(&mut self) -> Option<Self::Item> {
93        let item = self.stream.next().await?;
94        Some((self.f)(item).await)
95    }
96}