async_dnssd/
stream.rs

1use futures_channel::mpsc;
2use futures_util::StreamExt;
3use std::{
4	io,
5	os::raw::c_void,
6	pin::Pin,
7	task::{
8		Context,
9		Poll,
10	},
11};
12
13use crate::{
14	error::Error,
15	ffi,
16	inner::EventedService,
17};
18
19#[allow(clippy::borrowed_box)]
20fn box_raw<T>(ptr: &mut Box<T>) -> *mut c_void {
21	ptr.as_mut() as *mut T as *mut c_void
22}
23
24type CallbackContext<T> = mpsc::UnboundedSender<io::Result<T>>;
25
26#[must_use = "streams do nothing unless polled"]
27pub(crate) struct ServiceStream<S: EventedService, T> {
28	service: S,
29	_sender: Box<CallbackContext<T>>,
30	receiver: mpsc::UnboundedReceiver<io::Result<T>>,
31}
32
33impl<S: EventedService, T> ServiceStream<S, T> {
34	pub(crate) unsafe fn run_callback<F>(
35		context: *mut c_void,
36		error_code: ffi::DNSServiceErrorType,
37		f: F,
38	) where
39		F: FnOnce() -> io::Result<T>,
40		T: ::std::fmt::Debug,
41	{
42		let sender = context as *mut CallbackContext<T>;
43		let sender: &mut CallbackContext<T> = unsafe { &mut *sender };
44
45		let data = Error::from(error_code)
46			.map_err(io::Error::from)
47			.and_then(|()| f());
48
49		if let Err(send_err) = sender.unbounded_send(data) {
50			/* unbounded channel - can't be full, only disconnected (sender gone).
51			 * ignore; might just mean that the service keeps something alive, but the client
52			 * doesn't care about further notifications.
53			 * Public types should only offer where this makes sense (i.e. keeping service
54			 * alive without the stream).
55			 */
56			drop(send_err);
57		}
58	}
59
60	pub(crate) fn new<F>(f: F) -> io::Result<Self>
61	where
62		F: FnOnce(*mut c_void) -> Result<S, Error>,
63	{
64		let (sender, receiver) = mpsc::unbounded::<io::Result<T>>();
65		let mut sender = Box::new(sender);
66
67		let service = f(box_raw(&mut sender))?;
68
69		Ok(Self {
70			service,
71			_sender: sender,
72			receiver,
73		})
74	}
75
76	pub(crate) fn service(&self) -> &S {
77		&self.service
78	}
79}
80
81impl<S: EventedService, T> futures_core::Stream for ServiceStream<S, T> {
82	type Item = io::Result<T>;
83
84	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85		let this = self.get_mut();
86		this.service.poll_service(cx)?;
87		this.receiver.poll_next_unpin(cx)
88	}
89}