monoio/io/util/
split.rs

1use std::{
2    cell::UnsafeCell,
3    error::Error,
4    fmt::{self, Debug},
5    future::Future,
6    rc::Rc,
7};
8
9use super::CancelHandle;
10use crate::{
11    io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent},
12    BufResult,
13};
14
15/// Owned Read Half Part
16#[derive(Debug)]
17pub struct OwnedReadHalf<T>(pub Rc<UnsafeCell<T>>);
18/// Owned Write Half Part
19#[derive(Debug)]
20#[repr(transparent)]
21pub struct OwnedWriteHalf<T>(pub Rc<UnsafeCell<T>>)
22where
23    T: AsyncWriteRent;
24
25/// This is a dummy unsafe trait to inform monoio,
26/// the object with has this `Split` trait can be safely split
27/// to read/write object in both form of `Owned` or `Borrowed`.
28///
29/// # Safety
30///
31/// monoio cannot guarantee whether the custom object can be
32/// safely split to divided objects. Users should ensure the read
33/// operations are indenpendence from the write ones, the methods
34/// from `AsyncReadRent` and `AsyncWriteRent` can execute concurrently.
35pub unsafe trait Split {}
36
37/// Inner split trait
38pub trait Splitable {
39    /// Owned Read Split
40    type OwnedRead;
41    /// Owned Write Split
42    type OwnedWrite;
43
44    /// Split into owned parts
45    fn into_split(self) -> (Self::OwnedRead, Self::OwnedWrite);
46}
47
48impl<T> Splitable for T
49where
50    T: Split + AsyncWriteRent,
51{
52    type OwnedRead = OwnedReadHalf<T>;
53    type OwnedWrite = OwnedWriteHalf<T>;
54
55    #[inline]
56    fn into_split(self) -> (Self::OwnedRead, Self::OwnedWrite) {
57        let shared = Rc::new(UnsafeCell::new(self));
58        (OwnedReadHalf(shared.clone()), OwnedWriteHalf(shared))
59    }
60}
61
62impl<Inner> AsyncReadRent for OwnedReadHalf<Inner>
63where
64    Inner: AsyncReadRent,
65{
66    #[inline]
67    fn read<T: crate::buf::IoBufMut>(
68        &mut self,
69        buf: T,
70    ) -> impl Future<Output = BufResult<usize, T>> {
71        let stream = unsafe { &mut *self.0.get() };
72        stream.read(buf)
73    }
74
75    #[inline]
76    fn readv<T: crate::buf::IoVecBufMut>(
77        &mut self,
78        buf: T,
79    ) -> impl Future<Output = BufResult<usize, T>> {
80        let stream = unsafe { &mut *self.0.get() };
81        stream.readv(buf)
82    }
83}
84
85impl<Inner> CancelableAsyncReadRent for OwnedReadHalf<Inner>
86where
87    Inner: CancelableAsyncReadRent,
88{
89    #[inline]
90    fn cancelable_read<T: crate::buf::IoBufMut>(
91        &mut self,
92        buf: T,
93        c: CancelHandle,
94    ) -> impl Future<Output = crate::BufResult<usize, T>> {
95        let stream = unsafe { &mut *self.0.get() };
96        stream.cancelable_read(buf, c)
97    }
98
99    #[inline]
100    fn cancelable_readv<T: crate::buf::IoVecBufMut>(
101        &mut self,
102        buf: T,
103        c: CancelHandle,
104    ) -> impl Future<Output = crate::BufResult<usize, T>> {
105        let stream = unsafe { &mut *self.0.get() };
106        stream.cancelable_readv(buf, c)
107    }
108}
109
110impl<Inner> AsyncWriteRent for OwnedWriteHalf<Inner>
111where
112    Inner: AsyncWriteRent,
113{
114    #[inline]
115    fn write<T: crate::buf::IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
116        let stream = unsafe { &mut *self.0.get() };
117        stream.write(buf)
118    }
119
120    #[inline]
121    fn writev<T: crate::buf::IoVecBuf>(
122        &mut self,
123        buf_vec: T,
124    ) -> impl Future<Output = BufResult<usize, T>> {
125        let stream = unsafe { &mut *self.0.get() };
126        stream.writev(buf_vec)
127    }
128
129    #[inline]
130    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
131        let stream = unsafe { &mut *self.0.get() };
132        stream.flush()
133    }
134
135    #[inline]
136    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
137        let stream = unsafe { &mut *self.0.get() };
138        stream.shutdown()
139    }
140}
141
142impl<Inner> CancelableAsyncWriteRent for OwnedWriteHalf<Inner>
143where
144    Inner: CancelableAsyncWriteRent,
145{
146    #[inline]
147    fn cancelable_write<T: crate::buf::IoBuf>(
148        &mut self,
149        buf: T,
150        c: CancelHandle,
151    ) -> impl Future<Output = crate::BufResult<usize, T>> {
152        let stream = unsafe { &mut *self.0.get() };
153        stream.cancelable_write(buf, c)
154    }
155
156    #[inline]
157    fn cancelable_writev<T: crate::buf::IoVecBuf>(
158        &mut self,
159        buf_vec: T,
160        c: CancelHandle,
161    ) -> impl Future<Output = crate::BufResult<usize, T>> {
162        let stream = unsafe { &mut *self.0.get() };
163        stream.cancelable_writev(buf_vec, c)
164    }
165
166    #[inline]
167    fn cancelable_flush(&mut self, c: CancelHandle) -> impl Future<Output = std::io::Result<()>> {
168        let stream = unsafe { &mut *self.0.get() };
169        stream.cancelable_flush(c)
170    }
171
172    #[inline]
173    fn cancelable_shutdown(
174        &mut self,
175        c: CancelHandle,
176    ) -> impl Future<Output = std::io::Result<()>> {
177        let stream = unsafe { &mut *self.0.get() };
178        stream.cancelable_shutdown(c)
179    }
180}
181
182impl<T> OwnedReadHalf<T>
183where
184    T: AsyncWriteRent,
185{
186    /// reunite write half
187    #[inline]
188    pub fn reunite(self, other: OwnedWriteHalf<T>) -> Result<T, ReuniteError<T>> {
189        reunite(self, other)
190    }
191}
192
193impl<T> OwnedWriteHalf<T>
194where
195    T: AsyncWriteRent,
196{
197    /// reunite read half
198    #[inline]
199    pub fn reunite(self, other: OwnedReadHalf<T>) -> Result<T, ReuniteError<T>> {
200        reunite(other, self)
201    }
202}
203
204impl<T> Drop for OwnedWriteHalf<T>
205where
206    T: AsyncWriteRent,
207{
208    #[inline]
209    fn drop(&mut self) {
210        let write = unsafe { &mut *self.0.get() };
211        // Notes:: shutdown is an async function but rust currently does not support async drop
212        // this drop will only execute sync part of `shutdown` function.
213        #[allow(unused_must_use)]
214        {
215            write.shutdown();
216        }
217    }
218}
219
220pub(crate) fn reunite<T: AsyncWriteRent>(
221    read: OwnedReadHalf<T>,
222    write: OwnedWriteHalf<T>,
223) -> Result<T, ReuniteError<T>> {
224    if Rc::ptr_eq(&read.0, &write.0) {
225        // we cannot execute drop for OwnedWriteHalf.
226        unsafe {
227            let _inner: Rc<UnsafeCell<T>> = std::mem::transmute(write);
228        }
229        // This unwrap cannot fail as the api does not allow creating more than two
230        // Arcs, and we just dropped the other half.
231        Ok(Rc::try_unwrap(read.0)
232            .expect("try_unwrap failed in reunite")
233            .into_inner())
234    } else {
235        Err(ReuniteError(read, write))
236    }
237}
238
239/// Error indicating that two halves were not from the same socket, and thus
240/// could not be reunited.
241#[derive(Debug)]
242pub struct ReuniteError<T: AsyncWriteRent>(pub OwnedReadHalf<T>, pub OwnedWriteHalf<T>);
243
244impl<T> fmt::Display for ReuniteError<T>
245where
246    T: AsyncWriteRent,
247{
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        write!(f, "tried to reunite halves")
250    }
251}
252
253impl<T> Error for ReuniteError<T> where T: AsyncWriteRent + Debug {}