async_dnssd/
fused_err_stream.rs

1use futures_core::{
2	Stream,
3	TryStream,
4};
5use std::{
6	pin::Pin,
7	task::{
8		Context,
9		Poll,
10	},
11};
12
13enum Inner<E, S> {
14	Err(Option<E>),
15	Stream(S),
16}
17
18pub(crate) struct FusedErrorStream<S: TryStream>(Inner<S::Error, S>);
19
20impl<S: TryStream> From<Result<S, S::Error>> for FusedErrorStream<S> {
21	fn from(r: Result<S, S::Error>) -> Self {
22		match r {
23			Ok(s) => Self(Inner::Stream(s)),
24			Err(e) => Self(Inner::Err(Some(e))),
25		}
26	}
27}
28
29impl<S, T, E> Stream for FusedErrorStream<S>
30where
31	S: Stream<Item = Result<T, E>>,
32	E: Unpin,
33{
34	type Item = S::Item;
35
36	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37		match &mut unsafe { self.get_unchecked_mut() }.0 {
38			Inner::Err(e) => {
39				// "error variant" is `Unpin`; extract error and fuse stream
40				match e.take() {
41					Some(e) => Poll::Ready(Some(Err(e))),
42					None => Poll::Ready(None), // error already returned before
43				}
44			},
45			Inner::Stream(s) => unsafe { Pin::new_unchecked(s) }.poll_next(cx),
46		}
47	}
48}