async_dnssd/evented/
unix.rs

1use std::{
2	io,
3	os::raw::c_int,
4	task::{
5		Context,
6		Poll,
7	},
8};
9use tokio::io::unix::AsyncFd;
10
11fn is_readable(fd: c_int) -> io::Result<bool> {
12	let mut fds = libc::pollfd {
13		fd,
14		events: libc::POLLIN | libc::POLLHUP | libc::POLLERR,
15		revents: 0,
16	};
17	loop {
18		let r = unsafe { libc::poll(&mut fds, 1, 0) };
19		if r == 0 {
20			return Ok(false);
21		}
22		if r == 1 {
23			return Ok(true);
24		}
25		let e = io::Error::last_os_error();
26		if e.kind() == io::ErrorKind::Interrupted {
27			continue;
28		}
29		return Ok(false);
30	}
31}
32
33pub(crate) struct ReadProcessor(AsyncFd<c_int>);
34
35impl ReadProcessor {
36	pub(crate) fn new(fd: c_int) -> io::Result<Self> {
37		Ok(Self(AsyncFd::with_interest(
38			fd,
39			tokio::io::Interest::READABLE,
40		)?))
41	}
42
43	/// call "p" until fd is no longer readable
44	pub(crate) fn process<P>(&mut self, cx: &mut Context<'_>, mut p: P) -> io::Result<()>
45	where
46		P: FnMut() -> io::Result<()>,
47	{
48		loop {
49			let mut ready_guard = match self.0.poll_read_ready(cx) {
50				Poll::Pending => return Ok(()),
51				Poll::Ready(r) => r?,
52			};
53			while is_readable(*self.0.get_ref())? {
54				p()?;
55			}
56			ready_guard.clear_ready();
57			// after clear we need to poll again to be registered!
58		}
59	}
60}