monoio/io/stream/
stream_ext.rs1use std::future::Future;
2
3use super::{assert_stream, Stream};
4
5pub trait StreamExt: Stream {
7 fn map<T, F>(self, f: F) -> Map<Self, F>
9 where
10 F: FnMut(Self::Item) -> T,
11 Self: Sized,
12 {
13 assert_stream::<T, _>(Map::new(self, f))
14 }
15
16 fn then<Fut, F>(self, f: F) -> Then<Self, F>
19 where
20 F: FnMut(Self::Item) -> Fut,
21 Fut: Future,
22 Self: Sized,
23 {
24 assert_stream::<Fut::Output, _>(Then::new(self, f))
25 }
26
27 fn for_each<Fut, F>(mut self, mut f: F) -> impl Future<Output = ()>
30 where
31 F: FnMut(Self::Item) -> Fut,
32 Fut: Future<Output = ()>,
33 Self: Sized,
34 {
35 async move {
36 while let Some(item) = self.next().await {
37 (f)(item).await;
38 }
39 }
40 }
41}
42
43impl<T> StreamExt for T where T: Stream {}
44
45#[must_use = "streams do nothing unless polled"]
46pub struct Map<St, F> {
47 stream: St,
48 f: F,
49}
50
51impl<St, F> Map<St, F> {
52 pub(crate) fn new(stream: St, f: F) -> Self {
53 Self { stream, f }
54 }
55}
56
57impl<St, F, Item> Stream for Map<St, F>
58where
59 St: Stream,
60 F: FnMut(St::Item) -> Item,
61{
62 type Item = Item;
63
64 async fn next(&mut self) -> Option<Self::Item> {
65 self.stream.next().await.map(&mut self.f)
66 }
67}
68
69#[must_use = "streams do nothing unless polled"]
70pub struct Then<St, F> {
71 stream: St,
72 f: F,
73}
74
75impl<St, F> Then<St, F>
76where
77 St: Stream,
78{
79 pub(super) fn new(stream: St, f: F) -> Self {
80 Self { stream, f }
81 }
82}
83
84impl<St, Fut, F> Stream for Then<St, F>
85where
86 St: Stream,
87 F: FnMut(St::Item) -> Fut,
88 Fut: Future,
89{
90 type Item = Fut::Output;
91
92 async fn next(&mut self) -> Option<Self::Item> {
93 let item = self.stream.next().await?;
94 Some((self.f)(item).await)
95 }
96}