1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use futures_channel::mpsc;
use futures_util::StreamExt;
use std::{
	io,
	os::raw::c_void,
	pin::Pin,
	task::{
		Context,
		Poll,
	},
};

use crate::{
	error::Error,
	ffi,
	inner::EventedService,
};

#[allow(clippy::borrowed_box)]
fn box_raw<T>(ptr: &mut Box<T>) -> *mut c_void {
	ptr.as_mut() as *mut T as *mut c_void
}

type CallbackContext<T> = mpsc::UnboundedSender<io::Result<T>>;

#[must_use = "streams do nothing unless polled"]
pub(crate) struct ServiceStream<S: EventedService, T> {
	service: S,
	_sender: Box<CallbackContext<T>>,
	receiver: mpsc::UnboundedReceiver<io::Result<T>>,
}

impl<S: EventedService, T> ServiceStream<S, T> {
	pub(crate) unsafe fn run_callback<F>(
		context: *mut c_void,
		error_code: ffi::DNSServiceErrorType,
		f: F,
	) where
		F: FnOnce() -> io::Result<T>,
		T: ::std::fmt::Debug,
	{
		let sender = context as *mut CallbackContext<T>;
		let sender: &mut CallbackContext<T> = &mut *sender;

		let data = Error::from(error_code)
			.map_err(io::Error::from)
			.and_then(|()| f());

		sender
			.unbounded_send(data)
			.expect("receiver must still be alive");
	}

	pub(crate) fn new<F>(f: F) -> io::Result<Self>
	where
		F: FnOnce(*mut c_void) -> Result<S, Error>,
	{
		let (sender, receiver) = mpsc::unbounded::<io::Result<T>>();
		let mut sender = Box::new(sender);

		let service = f(box_raw(&mut sender))?;

		Ok(Self {
			service,
			_sender: sender,
			receiver,
		})
	}
}

impl<S: EventedService, T> futures_core::Stream for ServiceStream<S, T> {
	type Item = io::Result<T>;

	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
		self.service.poll_service(cx)?;
		self.receiver.poll_next_unpin(cx)
	}
}