async_dnssd/
timeout_stream.rs

1use futures_core::Stream;
2use std::{
3	future::Future,
4	pin::Pin,
5	task::{
6		Context,
7		Poll,
8	},
9	time::Duration,
10};
11
12/// `Stream` extension to simplify building [`TimeoutStream`]
13pub trait StreamTimeoutExt: Stream + Sized {
14	/// Create new [`TimeoutStream`]
15	fn timeout(self, duration: Duration) -> TimeoutStream<Self>;
16}
17
18impl<S: Stream> StreamTimeoutExt for S {
19	fn timeout(self, duration: Duration) -> TimeoutStream<Self> {
20		TimeoutStream::new(self, duration)
21	}
22}
23
24/// Add a timeout to a stream; each time an item is received the timer
25/// is reset
26///
27/// If the timeout triggers the stream ends (without an error).
28#[must_use = "streams do nothing unless polled"]
29pub struct TimeoutStream<S> {
30	stream: S,
31	duration: Duration,
32	timeout: tokio::time::Sleep,
33}
34
35impl<S: Stream> TimeoutStream<S> {
36	pin_utils::unsafe_pinned!(stream: S);
37
38	pin_utils::unsafe_pinned!(timeout: tokio::time::Sleep);
39
40	/// Create new [`TimeoutStream`].
41	///
42	/// Also see [`StreamTimeoutExt::timeout`].
43	pub fn new(stream: S, duration: Duration) -> Self {
44		Self {
45			stream,
46			duration,
47			timeout: tokio::time::sleep(duration),
48		}
49	}
50}
51
52impl<S: Stream> TimeoutStream<S> {
53	fn reset_timer(self: Pin<&mut Self>) {
54		let next = tokio::time::Instant::now() + self.duration;
55		self.timeout().reset(next);
56	}
57}
58
59impl<S: futures_core::TryStream> Stream for TimeoutStream<S> {
60	type Item = Result<S::Ok, S::Error>;
61
62	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63		match self.as_mut().stream().try_poll_next(cx) {
64			Poll::Ready(None) => Poll::Ready(None), // end of stream
65			Poll::Ready(Some(Ok(item))) => {
66				// not end of stream: reset timeout
67				self.reset_timer();
68				Poll::Ready(Some(Ok(item)))
69			},
70			Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
71			Poll::Pending => {
72				// check timeout
73				match self.timeout().poll(cx) {
74					// timed out?
75					Poll::Ready(()) => {
76						// not an error
77						Poll::Ready(None)
78					},
79					// still time left
80					Poll::Pending => Poll::Pending,
81				}
82			},
83		}
84	}
85}