monoio/macros/
try_join.rs

1/// Wait on multiple concurrent branches, returning when **all** branches
2/// complete with `Ok(_)` or on the first `Err(_)`.
3///
4/// The `try_join!` macro must be used inside of async functions, closures, and
5/// blocks.
6///
7/// Similar to [`join!`], the `try_join!` macro takes a list of async
8/// expressions and evaluates them concurrently on the same task. Each async
9/// expression evaluates to a future and the futures from each expression are
10/// multiplexed on the current task. The `try_join!` macro returns when **all**
11/// branches return with `Ok` or when the **first** branch returns with `Err`.
12///
13/// [`join!`]: macro@join
14///
15/// # Notes
16///
17/// The supplied futures are stored inline and does not require allocating a
18/// `Vec`.
19///
20/// ### Runtime characteristics
21///
22/// By running all async expressions on the current task, the expressions are
23/// able to run **concurrently** but not in **parallel**. This means all
24/// expressions are run on the same thread and if one branch blocks the thread,
25/// all other expressions will be unable to continue. If parallelism is
26/// required, spawn each async expression using [`monoio::spawn`] and pass the
27/// join handle to `try_join!`.
28///
29/// [`monoio::spawn`]: crate::spawn
30///
31/// # Examples
32///
33/// Basic try_join with two branches.
34///
35/// ```
36/// async fn do_stuff_async() -> Result<(), &'static str> {
37///     // async work
38/// # Ok(())
39/// }
40///
41/// async fn more_async_work() -> Result<(), &'static str> {
42///     // more here
43/// # Ok(())
44/// }
45///
46/// #[monoio::main]
47/// async fn main() {
48///     let res = monoio::try_join!(do_stuff_async(), more_async_work());
49///
50///     match res {
51///         Ok((first, second)) => {
52///             // do something with the values
53///         }
54///         Err(err) => {
55///             println!("processing failed; error = {}", err);
56///         }
57///     }
58/// }
59/// ```
60#[macro_export]
61#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
62macro_rules! try_join {
63    (@ {
64        // One `_` for each branch in the `try_join!` macro. This is not used once
65        // normalization is complete.
66        ( $($count:tt)* )
67
68        // Normalized try_join! branches
69        $( ( $($skip:tt)* ) $e:expr, )*
70
71    }) => {{
72        use $crate::macros::support::{maybe_done, poll_fn, Future, Pin};
73        use $crate::macros::support::Poll::{Ready, Pending};
74
75        // Safety: nothing must be moved out of `futures`. This is to satisfy
76        // the requirement of `Pin::new_unchecked` called below.
77        let mut futures = ( $( maybe_done($e), )* );
78
79        poll_fn(move |cx| {
80            let mut is_pending = false;
81
82            $(
83                // Extract the future for this branch from the tuple.
84                let ( $($skip,)* fut, .. ) = &mut futures;
85
86                // Safety: future is stored on the stack above
87                // and never moved.
88                let mut fut = unsafe { Pin::new_unchecked(fut) };
89
90                // Try polling
91                if fut.as_mut().poll(cx).is_pending() {
92                    is_pending = true;
93                } else if fut.as_mut().output_mut().expect("expected completed future").is_err() {
94                    return Ready(Err(fut.take_output().expect("expected completed future").err().unwrap()))
95                }
96            )*
97
98            if is_pending {
99                Pending
100            } else {
101                Ready(Ok(($({
102                    // Extract the future for this branch from the tuple.
103                    let ( $($skip,)* fut, .. ) = &mut futures;
104
105                    // Safety: future is stored on the stack above
106                    // and never moved.
107                    let mut fut = unsafe { Pin::new_unchecked(fut) };
108
109                    fut
110                        .take_output()
111                        .expect("expected completed future")
112                        .ok()
113                        .expect("expected Ok(_)")
114                },)*)))
115            }
116        }).await
117    }};
118
119    // ===== Normalize =====
120
121    (@ { ( $($s:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
122        $crate::try_join!(@{ ($($s)* _) $($t)* ($($s)*) $e, } $($r)*)
123    };
124
125    // ===== Entry point =====
126
127    ( $($e:expr),* $(,)?) => {
128        $crate::try_join!(@{ () } $($e,)*)
129    };
130}