Skip to content

Commit 3e19c20

Browse files
committed
resolver: rewrite as a manually implemented future
1 parent 6ce390b commit 3e19c20

File tree

1 file changed

+131
-88
lines changed

1 file changed

+131
-88
lines changed

src/async_/resolver.rs

Lines changed: 131 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
use alloc::string::{String, ToString};
1111
use core::ffi::c_void;
1212
use core::fmt;
13+
use core::pin::Pin;
1314
use core::ptr::NonNull;
15+
use core::task::{Context, Poll, Waker};
1416

1517
use crate::{
1618
allocator::Box,
@@ -21,7 +23,6 @@ use crate::{
2123
ngx_resolver_t, ngx_str_t,
2224
},
2325
};
24-
use futures_channel::oneshot::{channel, Sender};
2526
use nginx_sys::{
2627
NGX_RESOLVE_FORMERR, NGX_RESOLVE_NOTIMP, NGX_RESOLVE_NXDOMAIN, NGX_RESOLVE_REFUSED,
2728
NGX_RESOLVE_SERVFAIL, NGX_RESOLVE_TIMEDOUT,
@@ -106,52 +107,6 @@ impl TryFrom<isize> for ResolverError {
106107

107108
type Res = Result<Vec<ngx_addr_t>, Error>;
108109

109-
struct ResCtx<'a> {
110-
ctx: Option<*mut ngx_resolver_ctx_t>,
111-
sender: Option<Sender<Res>>,
112-
pool: &'a Pool,
113-
}
114-
115-
impl Drop for ResCtx<'_> {
116-
fn drop(&mut self) {
117-
if let Some(ctx) = self.ctx.take() {
118-
unsafe {
119-
nginx_sys::ngx_resolve_name_done(ctx);
120-
}
121-
}
122-
}
123-
}
124-
125-
fn copy_resolved_addr(
126-
addr: *mut nginx_sys::ngx_resolver_addr_t,
127-
pool: &Pool,
128-
) -> Result<ngx_addr_t, Error> {
129-
let addr = NonNull::new(addr).ok_or(Error::Unexpected(
130-
"null ngx_resolver_addr_t in ngx_resolver_ctx_t.addrs".to_string(),
131-
))?;
132-
let addr = unsafe { addr.as_ref() };
133-
134-
let sockaddr = pool.alloc(addr.socklen as usize) as *mut nginx_sys::sockaddr;
135-
if sockaddr.is_null() {
136-
Err(Error::AllocationFailed)?;
137-
}
138-
unsafe {
139-
addr.sockaddr
140-
.cast::<u8>()
141-
.copy_to_nonoverlapping(sockaddr.cast(), addr.socklen as usize)
142-
};
143-
144-
let name =
145-
unsafe { ngx_str_t::from_bytes(pool.as_ref() as *const _ as *mut _, addr.name.as_bytes()) }
146-
.ok_or(Error::AllocationFailed)?;
147-
148-
Ok(ngx_addr_t {
149-
sockaddr,
150-
socklen: addr.socklen,
151-
name,
152-
})
153-
}
154-
155110
/// A wrapper for an ngx_resolver_t which provides an async Rust API
156111
pub struct Resolver {
157112
resolver: NonNull<ngx_resolver_t>,
@@ -170,68 +125,156 @@ impl Resolver {
170125
/// The set of addresses may not be deterministic, because the
171126
/// implementation of the resolver may race multiple DNS requests.
172127
pub async fn resolve(&self, name: &ngx_str_t, pool: &Pool) -> Res {
173-
unsafe {
174-
let ctx: *mut ngx_resolver_ctx_t =
175-
ngx_resolve_start(self.resolver.as_ptr(), core::ptr::null_mut());
176-
if ctx.is_null() {
177-
Err(Error::AllocationFailed)?
178-
}
179-
if ctx as isize == -1 {
180-
Err(Error::NoResolver)?
181-
}
128+
let ctx = unsafe {
129+
NonNull::new(ngx_resolve_start(
130+
self.resolver.as_ptr(),
131+
core::ptr::null_mut(),
132+
))
133+
.ok_or_else(|| Error::AllocationFailed)?
134+
};
182135

183-
let (sender, receiver) = channel::<Res>();
184-
let rctx = Box::new(ResCtx {
185-
ctx: Some(ctx),
186-
sender: Some(sender),
187-
pool,
188-
});
189-
190-
(*ctx).name = *name;
191-
(*ctx).timeout = self.timeout;
192-
(*ctx).set_cancelable(1);
193-
(*ctx).handler = Some(Self::resolve_handler);
194-
(*ctx).data = Box::into_raw(rctx) as *mut c_void;
195-
196-
let ret = ngx_resolve_name(ctx);
197-
if ret != 0 {
198-
Err(Error::Resolver(
199-
ResolverError::try_from(ret).expect("nonzero, checked above"),
200-
name.to_string(),
201-
))?;
202-
}
136+
let mut resolver = Resolution::new(name, pool, self.timeout, ctx)?;
137+
resolver.as_mut().await
138+
// FIXME how does timeout get caught??
139+
/*
140+
resolver
141+
.await
142+
.map_err(|_| Error::Resolver(ResolverError::TimedOut, name.to_string()))?
143+
*/
144+
}
145+
}
146+
147+
struct Resolution<'a> {
148+
complete: Option<Res>,
149+
waker: Option<Waker>,
150+
pool: &'a Pool,
151+
ctx: Option<NonNull<ngx_resolver_ctx_t>>,
152+
}
153+
154+
impl<'a> Resolution<'a> {
155+
fn new(
156+
name: &ngx_str_t,
157+
pool: &'a Pool,
158+
timeout: ngx_msec_t,
159+
mut ctx: NonNull<ngx_resolver_ctx_t>,
160+
) -> Result<Pin<Box<Self>>, Error> {
161+
let mut this = Pin::new(Box::new(Resolution {
162+
complete: None,
163+
waker: None,
164+
pool,
165+
ctx: Some(ctx),
166+
}));
203167

204-
receiver
205-
.await
206-
.map_err(|_| Error::Resolver(ResolverError::TimedOut, name.to_string()))?
168+
{
169+
let ctx: &mut ngx_resolver_ctx_t = unsafe { ctx.as_mut() };
170+
ctx.name = *name;
171+
ctx.timeout = timeout;
172+
ctx.set_cancelable(1);
173+
ctx.handler = Some(Self::handler);
174+
let ptr: &mut Resolution = unsafe { Pin::into_inner_unchecked(this.as_mut()) };
175+
ctx.data = ptr as *mut Resolution as *mut c_void;
207176
}
177+
178+
let ret = unsafe { ngx_resolve_name(ctx.as_ptr()) };
179+
if ret != 0 {
180+
return Err(Error::Resolver(
181+
ResolverError::try_from(ret).expect("nonzero, checked above"),
182+
name.to_string(),
183+
));
184+
}
185+
186+
Ok(this)
208187
}
209188

210-
unsafe extern "C" fn resolve_handler(ctx: *mut ngx_resolver_ctx_t) {
211-
let mut rctx = Box::into_inner(unsafe { Box::from_raw((*ctx).data as *mut ResCtx) });
212-
rctx.ctx.take();
213-
if let Some(sender) = rctx.sender.take() {
214-
let _ = sender.send(Self::resolve_result(ctx, rctx.pool));
189+
unsafe extern "C" fn handler(ctx: *mut ngx_resolver_ctx_t) {
190+
let mut data = unsafe { NonNull::new_unchecked((*ctx).data as *mut Resolution) };
191+
let this: &mut Resolution = unsafe { data.as_mut() };
192+
this.complete = Some(Self::resolve_result(ctx, this.pool));
193+
this.ctx.take();
194+
if let Some(waker) = this.waker.take() {
195+
// Ensure &mut Resolution no longer exists when wake resumes
196+
// suspended Future of Pin<&mut Resolution>
197+
let _ = this;
198+
waker.wake();
215199
}
216-
unsafe { nginx_sys::ngx_resolve_name_done(ctx) };
217200
}
218201

219202
fn resolve_result(ctx: *mut ngx_resolver_ctx_t, pool: &Pool) -> Res {
220203
let ctx = unsafe { ctx.as_ref().unwrap() };
221204
let s = ctx.state;
222205
if s != 0 {
223-
Err(Error::Resolver(
206+
return Err(Error::Resolver(
224207
ResolverError::try_from(s).expect("nonzero, checked above"),
225208
ctx.name.to_string(),
226-
))?;
209+
));
227210
}
228211
if ctx.addrs.is_null() {
229212
Err(Error::AllocationFailed)?;
230213
}
231214
let mut out = Vec::new();
232215
for i in 0..ctx.naddrs {
233-
out.push(copy_resolved_addr(unsafe { ctx.addrs.add(i) }, pool)?);
216+
out.push(Self::copy_resolved_addr(unsafe { ctx.addrs.add(i) }, pool)?);
234217
}
235218
Ok(out)
236219
}
220+
221+
fn copy_resolved_addr(
222+
addr: *mut nginx_sys::ngx_resolver_addr_t,
223+
pool: &Pool,
224+
) -> Result<ngx_addr_t, Error> {
225+
let addr = NonNull::new(addr).ok_or(Error::Unexpected(
226+
"null ngx_resolver_addr_t in ngx_resolver_ctx_t.addrs".to_string(),
227+
))?;
228+
let addr = unsafe { addr.as_ref() };
229+
230+
let sockaddr = pool.alloc(addr.socklen as usize) as *mut nginx_sys::sockaddr;
231+
if sockaddr.is_null() {
232+
Err(Error::AllocationFailed)?;
233+
}
234+
unsafe {
235+
addr.sockaddr
236+
.cast::<u8>()
237+
.copy_to_nonoverlapping(sockaddr.cast(), addr.socklen as usize)
238+
};
239+
240+
let name = unsafe {
241+
ngx_str_t::from_bytes(pool.as_ref() as *const _ as *mut _, addr.name.as_bytes())
242+
}
243+
.ok_or(Error::AllocationFailed)?;
244+
245+
Ok(ngx_addr_t {
246+
sockaddr,
247+
socklen: addr.socklen,
248+
name,
249+
})
250+
}
251+
}
252+
253+
impl<'a> core::future::Future for Resolution<'a> {
254+
type Output = Result<Vec<ngx_addr_t>, Error>;
255+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256+
let mut this = self.as_mut();
257+
match this.complete.take() {
258+
Some(res) => Poll::Ready(res),
259+
None => {
260+
match &mut self.waker {
261+
None => {
262+
self.waker = Some(cx.waker().clone());
263+
}
264+
Some(w) => w.clone_from(cx.waker()),
265+
}
266+
Poll::Pending
267+
}
268+
}
269+
}
270+
}
271+
272+
impl<'a> Drop for Resolution<'a> {
273+
fn drop(&mut self) {
274+
if let Some(mut ctx) = self.ctx.take() {
275+
unsafe {
276+
nginx_sys::ngx_resolve_name_done(ctx.as_mut());
277+
}
278+
}
279+
}
237280
}

0 commit comments

Comments
 (0)