1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use futures_channel::oneshot;
use futures_util::FutureExt;
use std::{
	future::Future,
	io,
	os::raw::c_void,
	pin::Pin,
	task::{
		Context,
		Poll,
	},
};

use crate::{
	error::Error,
	ffi,
	inner::EventedService,
};

#[allow(clippy::borrowed_box)]
fn box_raw<T>(ptr: &mut Box<T>) -> *mut c_void {
	ptr.as_mut() as *mut T as *mut c_void
}

type CallbackContext<T> = Option<oneshot::Sender<io::Result<T>>>;

struct Inner<S: EventedService, T> {
	service: S,
	_sender: Box<CallbackContext<T>>,
	receiver: oneshot::Receiver<io::Result<T>>,
}

#[must_use = "futures do nothing unless polled"]
pub(crate) struct ServiceFuture<S: EventedService, T>(Option<Inner<S, T>>);

impl<S: EventedService, T> ServiceFuture<S, T> {
	pub(crate) unsafe fn run_callback<F>(
		context: *mut c_void,
		error_code: ffi::DNSServiceErrorType,
		f: F,
	) where
		F: FnOnce() -> io::Result<T>,
		T: ::std::fmt::Debug,
	{
		let sender = context as *mut CallbackContext<T>;
		let sender: &mut CallbackContext<T> = &mut *sender;
		let sender = sender.take().expect("callback must be run only once");

		let data = Error::from(error_code)
			.map_err(io::Error::from)
			.and_then(|()| f());

		sender.send(data).expect("receiver must still be alive");
	}

	pub(crate) fn new<F>(f: F) -> io::Result<Self>
	where
		F: FnOnce(*mut c_void) -> Result<S, Error>,
	{
		let (sender, receiver) = oneshot::channel::<io::Result<T>>();
		let mut sender = Box::new(Some(sender));

		let service = f(box_raw(&mut sender))?;

		Ok(Self(Some(Inner {
			service,
			_sender: sender,
			receiver,
		})))
	}

	pub(crate) fn new_with<R, F>(service: S, f: F) -> io::Result<(Self, R)>
	where
		F: FnOnce(*mut c_void) -> Result<R, Error>,
	{
		let (sender, receiver) = oneshot::channel::<io::Result<T>>();
		let mut sender = Box::new(Some(sender));

		let res = f(box_raw(&mut sender))?;

		Ok((
			Self(Some(Inner {
				service,
				_sender: sender,
				receiver,
			})),
			res,
		))
	}

	fn inner(&self) -> &Inner<S, T> {
		self.0.as_ref().expect("can only get ready once")
	}

	fn inner_mut(&mut self) -> &mut Inner<S, T> {
		self.0.as_mut().expect("can only get ready once")
	}

	pub(crate) fn service(&self) -> &S {
		&self.inner().service
	}
}

impl<S: EventedService, T> Future for ServiceFuture<S, T> {
	type Output = io::Result<(S, T)>;

	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
		if self.0.is_none() {
			// can only get ready once.
			return Poll::Pending;
		}
		self.inner_mut().service.poll_service(cx)?;
		let item = futures_core::ready!(self.inner_mut().receiver.poll_unpin(cx))
			.expect("send can't die")?;
		Poll::Ready(Ok((self.0.take().unwrap().service, item)))
	}
}