1use futures_channel::mpsc;
2use futures_util::StreamExt;
3use std::{
4 io,
5 os::raw::c_void,
6 pin::Pin,
7 task::{
8 Context,
9 Poll,
10 },
11};
12
13use crate::{
14 error::Error,
15 ffi,
16 inner::EventedService,
17};
18
19#[allow(clippy::borrowed_box)]
20fn box_raw<T>(ptr: &mut Box<T>) -> *mut c_void {
21 ptr.as_mut() as *mut T as *mut c_void
22}
23
24type CallbackContext<T> = mpsc::UnboundedSender<io::Result<T>>;
25
26#[must_use = "streams do nothing unless polled"]
27pub(crate) struct ServiceStream<S: EventedService, T> {
28 service: S,
29 _sender: Box<CallbackContext<T>>,
30 receiver: mpsc::UnboundedReceiver<io::Result<T>>,
31}
32
33impl<S: EventedService, T> ServiceStream<S, T> {
34 pub(crate) unsafe fn run_callback<F>(
35 context: *mut c_void,
36 error_code: ffi::DNSServiceErrorType,
37 f: F,
38 ) where
39 F: FnOnce() -> io::Result<T>,
40 T: ::std::fmt::Debug,
41 {
42 let sender = context as *mut CallbackContext<T>;
43 let sender: &mut CallbackContext<T> = unsafe { &mut *sender };
44
45 let data = Error::from(error_code)
46 .map_err(io::Error::from)
47 .and_then(|()| f());
48
49 if let Err(send_err) = sender.unbounded_send(data) {
50 drop(send_err);
57 }
58 }
59
60 pub(crate) fn new<F>(f: F) -> io::Result<Self>
61 where
62 F: FnOnce(*mut c_void) -> Result<S, Error>,
63 {
64 let (sender, receiver) = mpsc::unbounded::<io::Result<T>>();
65 let mut sender = Box::new(sender);
66
67 let service = f(box_raw(&mut sender))?;
68
69 Ok(Self {
70 service,
71 _sender: sender,
72 receiver,
73 })
74 }
75
76 pub(crate) fn service(&self) -> &S {
77 &self.service
78 }
79}
80
81impl<S: EventedService, T> futures_core::Stream for ServiceStream<S, T> {
82 type Item = io::Result<T>;
83
84 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85 let this = self.get_mut();
86 this.service.poll_service(cx)?;
87 this.receiver.poll_next_unpin(cx)
88 }
89}