monoio/macros/try_join.rs
1/// Wait on multiple concurrent branches, returning when **all** branches
2/// complete with `Ok(_)` or on the first `Err(_)`.
3///
4/// The `try_join!` macro must be used inside of async functions, closures, and
5/// blocks.
6///
7/// Similar to [`join!`], the `try_join!` macro takes a list of async
8/// expressions and evaluates them concurrently on the same task. Each async
9/// expression evaluates to a future and the futures from each expression are
10/// multiplexed on the current task. The `try_join!` macro returns when **all**
11/// branches return with `Ok` or when the **first** branch returns with `Err`.
12///
13/// [`join!`]: macro@join
14///
15/// # Notes
16///
17/// The supplied futures are stored inline and does not require allocating a
18/// `Vec`.
19///
20/// ### Runtime characteristics
21///
22/// By running all async expressions on the current task, the expressions are
23/// able to run **concurrently** but not in **parallel**. This means all
24/// expressions are run on the same thread and if one branch blocks the thread,
25/// all other expressions will be unable to continue. If parallelism is
26/// required, spawn each async expression using [`monoio::spawn`] and pass the
27/// join handle to `try_join!`.
28///
29/// [`monoio::spawn`]: crate::spawn
30///
31/// # Examples
32///
33/// Basic try_join with two branches.
34///
35/// ```
36/// async fn do_stuff_async() -> Result<(), &'static str> {
37/// // async work
38/// # Ok(())
39/// }
40///
41/// async fn more_async_work() -> Result<(), &'static str> {
42/// // more here
43/// # Ok(())
44/// }
45///
46/// #[monoio::main]
47/// async fn main() {
48/// let res = monoio::try_join!(do_stuff_async(), more_async_work());
49///
50/// match res {
51/// Ok((first, second)) => {
52/// // do something with the values
53/// }
54/// Err(err) => {
55/// println!("processing failed; error = {}", err);
56/// }
57/// }
58/// }
59/// ```
60#[macro_export]
61#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
62macro_rules! try_join {
63 (@ {
64 // One `_` for each branch in the `try_join!` macro. This is not used once
65 // normalization is complete.
66 ( $($count:tt)* )
67
68 // Normalized try_join! branches
69 $( ( $($skip:tt)* ) $e:expr, )*
70
71 }) => {{
72 use $crate::macros::support::{maybe_done, poll_fn, Future, Pin};
73 use $crate::macros::support::Poll::{Ready, Pending};
74
75 // Safety: nothing must be moved out of `futures`. This is to satisfy
76 // the requirement of `Pin::new_unchecked` called below.
77 let mut futures = ( $( maybe_done($e), )* );
78
79 poll_fn(move |cx| {
80 let mut is_pending = false;
81
82 $(
83 // Extract the future for this branch from the tuple.
84 let ( $($skip,)* fut, .. ) = &mut futures;
85
86 // Safety: future is stored on the stack above
87 // and never moved.
88 let mut fut = unsafe { Pin::new_unchecked(fut) };
89
90 // Try polling
91 if fut.as_mut().poll(cx).is_pending() {
92 is_pending = true;
93 } else if fut.as_mut().output_mut().expect("expected completed future").is_err() {
94 return Ready(Err(fut.take_output().expect("expected completed future").err().unwrap()))
95 }
96 )*
97
98 if is_pending {
99 Pending
100 } else {
101 Ready(Ok(($({
102 // Extract the future for this branch from the tuple.
103 let ( $($skip,)* fut, .. ) = &mut futures;
104
105 // Safety: future is stored on the stack above
106 // and never moved.
107 let mut fut = unsafe { Pin::new_unchecked(fut) };
108
109 fut
110 .take_output()
111 .expect("expected completed future")
112 .ok()
113 .expect("expected Ok(_)")
114 },)*)))
115 }
116 }).await
117 }};
118
119 // ===== Normalize =====
120
121 (@ { ( $($s:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
122 $crate::try_join!(@{ ($($s)* _) $($t)* ($($s)*) $e, } $($r)*)
123 };
124
125 // ===== Entry point =====
126
127 ( $($e:expr),* $(,)?) => {
128 $crate::try_join!(@{ () } $($e,)*)
129 };
130}