//! Split a single value implementing `AsyncRead + AsyncWrite` into separate //! `AsyncRead` and `AsyncWrite` handles. //! //! To restore this read/write object from its `split::ReadHalf` and //! `split::WriteHalf` use `unsplit`. use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use std::fmt; use std::io; use std::pin::Pin; use std::sync::Arc; use std::sync::Mutex; use std::task::{Context, Poll}; cfg_io_util! { /// The readable half of a value returned from [`split`](split()). pub struct ReadHalf { inner: Arc>, } /// The writable half of a value returned from [`split`](split()). pub struct WriteHalf { inner: Arc>, } /// Splits a single value implementing `AsyncRead + AsyncWrite` into separate /// `AsyncRead` and `AsyncWrite` handles. /// /// To restore this read/write object from its `ReadHalf` and /// `WriteHalf` use [`unsplit`](ReadHalf::unsplit()). pub fn split(stream: T) -> (ReadHalf, WriteHalf) where T: AsyncRead + AsyncWrite, { let is_write_vectored = stream.is_write_vectored(); let inner = Arc::new(Inner { stream: Mutex::new(stream), is_write_vectored, }); let rd = ReadHalf { inner: inner.clone(), }; let wr = WriteHalf { inner }; (rd, wr) } } struct Inner { stream: Mutex, is_write_vectored: bool, } impl Inner { fn with_lock(&self, f: impl FnOnce(Pin<&mut T>) -> R) -> R { let mut guard = self.stream.lock().unwrap(); // safety: we do not move the stream. let stream = unsafe { Pin::new_unchecked(&mut *guard) }; f(stream) } } impl ReadHalf { /// Checks if this `ReadHalf` and some `WriteHalf` were split from the same /// stream. pub fn is_pair_of(&self, other: &WriteHalf) -> bool { other.is_pair_of(self) } /// Reunites with a previously split `WriteHalf`. /// /// # Panics /// /// If this `ReadHalf` and the given `WriteHalf` do not originate from the /// same `split` operation this method will panic. /// This can be checked ahead of time by calling [`is_pair_of()`](Self::is_pair_of). #[track_caller] pub fn unsplit(self, wr: WriteHalf) -> T where T: Unpin, { if self.is_pair_of(&wr) { drop(wr); let inner = Arc::try_unwrap(self.inner) .ok() .expect("`Arc::try_unwrap` failed"); inner.stream.into_inner().unwrap() } else { panic!("Unrelated `split::Write` passed to `split::Read::unsplit`.") } } } impl WriteHalf { /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same /// stream. pub fn is_pair_of(&self, other: &ReadHalf) -> bool { Arc::ptr_eq(&self.inner, &other.inner) } } impl AsyncRead for ReadHalf { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { self.inner.with_lock(|stream| stream.poll_read(cx, buf)) } } impl AsyncWrite for WriteHalf { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.inner.with_lock(|stream| stream.poll_write(cx, buf)) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.with_lock(|stream| stream.poll_flush(cx)) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.with_lock(|stream| stream.poll_shutdown(cx)) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { self.inner .with_lock(|stream| stream.poll_write_vectored(cx, bufs)) } fn is_write_vectored(&self) -> bool { self.inner.is_write_vectored } } unsafe impl Send for ReadHalf {} unsafe impl Send for WriteHalf {} unsafe impl Sync for ReadHalf {} unsafe impl Sync for WriteHalf {} impl fmt::Debug for ReadHalf { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("split::ReadHalf").finish() } } impl fmt::Debug for WriteHalf { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("split::WriteHalf").finish() } }