async_dnssd/evented/
unix.rs1use std::{
2 io,
3 os::raw::c_int,
4 task::{
5 Context,
6 Poll,
7 },
8};
9use tokio::io::unix::AsyncFd;
10
11fn is_readable(fd: c_int) -> io::Result<bool> {
12 let mut fds = libc::pollfd {
13 fd,
14 events: libc::POLLIN | libc::POLLHUP | libc::POLLERR,
15 revents: 0,
16 };
17 loop {
18 let r = unsafe { libc::poll(&mut fds, 1, 0) };
19 if r == 0 {
20 return Ok(false);
21 }
22 if r == 1 {
23 return Ok(true);
24 }
25 let e = io::Error::last_os_error();
26 if e.kind() == io::ErrorKind::Interrupted {
27 continue;
28 }
29 return Ok(false);
30 }
31}
32
33pub(crate) struct ReadProcessor(AsyncFd<c_int>);
34
35impl ReadProcessor {
36 pub(crate) fn new(fd: c_int) -> io::Result<Self> {
37 Ok(Self(AsyncFd::with_interest(
38 fd,
39 tokio::io::Interest::READABLE,
40 )?))
41 }
42
43 pub(crate) fn process<P>(&mut self, cx: &mut Context<'_>, mut p: P) -> io::Result<()>
45 where
46 P: FnMut() -> io::Result<()>,
47 {
48 loop {
49 let mut ready_guard = match self.0.poll_read_ready(cx) {
50 Poll::Pending => return Ok(()),
51 Poll::Ready(r) => r?,
52 };
53 while is_readable(*self.0.get_ref())? {
54 p()?;
55 }
56 ready_guard.clear_ready();
57 }
59 }
60}