1use std::{
2 cell::UnsafeCell,
3 error::Error,
4 fmt::{self, Debug},
5 future::Future,
6 rc::Rc,
7};
8
9use super::CancelHandle;
10use crate::{
11 io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent},
12 BufResult,
13};
14
15#[derive(Debug)]
17pub struct OwnedReadHalf<T>(pub Rc<UnsafeCell<T>>);
18#[derive(Debug)]
20#[repr(transparent)]
21pub struct OwnedWriteHalf<T>(pub Rc<UnsafeCell<T>>)
22where
23 T: AsyncWriteRent;
24
25pub unsafe trait Split {}
36
37pub trait Splitable {
39 type OwnedRead;
41 type OwnedWrite;
43
44 fn into_split(self) -> (Self::OwnedRead, Self::OwnedWrite);
46}
47
48impl<T> Splitable for T
49where
50 T: Split + AsyncWriteRent,
51{
52 type OwnedRead = OwnedReadHalf<T>;
53 type OwnedWrite = OwnedWriteHalf<T>;
54
55 #[inline]
56 fn into_split(self) -> (Self::OwnedRead, Self::OwnedWrite) {
57 let shared = Rc::new(UnsafeCell::new(self));
58 (OwnedReadHalf(shared.clone()), OwnedWriteHalf(shared))
59 }
60}
61
62impl<Inner> AsyncReadRent for OwnedReadHalf<Inner>
63where
64 Inner: AsyncReadRent,
65{
66 #[inline]
67 fn read<T: crate::buf::IoBufMut>(
68 &mut self,
69 buf: T,
70 ) -> impl Future<Output = BufResult<usize, T>> {
71 let stream = unsafe { &mut *self.0.get() };
72 stream.read(buf)
73 }
74
75 #[inline]
76 fn readv<T: crate::buf::IoVecBufMut>(
77 &mut self,
78 buf: T,
79 ) -> impl Future<Output = BufResult<usize, T>> {
80 let stream = unsafe { &mut *self.0.get() };
81 stream.readv(buf)
82 }
83}
84
85impl<Inner> CancelableAsyncReadRent for OwnedReadHalf<Inner>
86where
87 Inner: CancelableAsyncReadRent,
88{
89 #[inline]
90 fn cancelable_read<T: crate::buf::IoBufMut>(
91 &mut self,
92 buf: T,
93 c: CancelHandle,
94 ) -> impl Future<Output = crate::BufResult<usize, T>> {
95 let stream = unsafe { &mut *self.0.get() };
96 stream.cancelable_read(buf, c)
97 }
98
99 #[inline]
100 fn cancelable_readv<T: crate::buf::IoVecBufMut>(
101 &mut self,
102 buf: T,
103 c: CancelHandle,
104 ) -> impl Future<Output = crate::BufResult<usize, T>> {
105 let stream = unsafe { &mut *self.0.get() };
106 stream.cancelable_readv(buf, c)
107 }
108}
109
110impl<Inner> AsyncWriteRent for OwnedWriteHalf<Inner>
111where
112 Inner: AsyncWriteRent,
113{
114 #[inline]
115 fn write<T: crate::buf::IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
116 let stream = unsafe { &mut *self.0.get() };
117 stream.write(buf)
118 }
119
120 #[inline]
121 fn writev<T: crate::buf::IoVecBuf>(
122 &mut self,
123 buf_vec: T,
124 ) -> impl Future<Output = BufResult<usize, T>> {
125 let stream = unsafe { &mut *self.0.get() };
126 stream.writev(buf_vec)
127 }
128
129 #[inline]
130 fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
131 let stream = unsafe { &mut *self.0.get() };
132 stream.flush()
133 }
134
135 #[inline]
136 fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
137 let stream = unsafe { &mut *self.0.get() };
138 stream.shutdown()
139 }
140}
141
142impl<Inner> CancelableAsyncWriteRent for OwnedWriteHalf<Inner>
143where
144 Inner: CancelableAsyncWriteRent,
145{
146 #[inline]
147 fn cancelable_write<T: crate::buf::IoBuf>(
148 &mut self,
149 buf: T,
150 c: CancelHandle,
151 ) -> impl Future<Output = crate::BufResult<usize, T>> {
152 let stream = unsafe { &mut *self.0.get() };
153 stream.cancelable_write(buf, c)
154 }
155
156 #[inline]
157 fn cancelable_writev<T: crate::buf::IoVecBuf>(
158 &mut self,
159 buf_vec: T,
160 c: CancelHandle,
161 ) -> impl Future<Output = crate::BufResult<usize, T>> {
162 let stream = unsafe { &mut *self.0.get() };
163 stream.cancelable_writev(buf_vec, c)
164 }
165
166 #[inline]
167 fn cancelable_flush(&mut self, c: CancelHandle) -> impl Future<Output = std::io::Result<()>> {
168 let stream = unsafe { &mut *self.0.get() };
169 stream.cancelable_flush(c)
170 }
171
172 #[inline]
173 fn cancelable_shutdown(
174 &mut self,
175 c: CancelHandle,
176 ) -> impl Future<Output = std::io::Result<()>> {
177 let stream = unsafe { &mut *self.0.get() };
178 stream.cancelable_shutdown(c)
179 }
180}
181
182impl<T> OwnedReadHalf<T>
183where
184 T: AsyncWriteRent,
185{
186 #[inline]
188 pub fn reunite(self, other: OwnedWriteHalf<T>) -> Result<T, ReuniteError<T>> {
189 reunite(self, other)
190 }
191}
192
193impl<T> OwnedWriteHalf<T>
194where
195 T: AsyncWriteRent,
196{
197 #[inline]
199 pub fn reunite(self, other: OwnedReadHalf<T>) -> Result<T, ReuniteError<T>> {
200 reunite(other, self)
201 }
202}
203
204impl<T> Drop for OwnedWriteHalf<T>
205where
206 T: AsyncWriteRent,
207{
208 #[inline]
209 fn drop(&mut self) {
210 let write = unsafe { &mut *self.0.get() };
211 #[allow(unused_must_use)]
214 {
215 write.shutdown();
216 }
217 }
218}
219
220pub(crate) fn reunite<T: AsyncWriteRent>(
221 read: OwnedReadHalf<T>,
222 write: OwnedWriteHalf<T>,
223) -> Result<T, ReuniteError<T>> {
224 if Rc::ptr_eq(&read.0, &write.0) {
225 unsafe {
227 let _inner: Rc<UnsafeCell<T>> = std::mem::transmute(write);
228 }
229 Ok(Rc::try_unwrap(read.0)
232 .expect("try_unwrap failed in reunite")
233 .into_inner())
234 } else {
235 Err(ReuniteError(read, write))
236 }
237}
238
239#[derive(Debug)]
242pub struct ReuniteError<T: AsyncWriteRent>(pub OwnedReadHalf<T>, pub OwnedWriteHalf<T>);
243
244impl<T> fmt::Display for ReuniteError<T>
245where
246 T: AsyncWriteRent,
247{
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 write!(f, "tried to reunite halves")
250 }
251}
252
253impl<T> Error for ReuniteError<T> where T: AsyncWriteRent + Debug {}