diff options
Diffstat (limited to 'vendor/hyper/src/ffi/task.rs')
| -rw-r--r-- | vendor/hyper/src/ffi/task.rs | 549 |
1 files changed, 549 insertions, 0 deletions
diff --git a/vendor/hyper/src/ffi/task.rs b/vendor/hyper/src/ffi/task.rs new file mode 100644 index 00000000..5b33d42b --- /dev/null +++ b/vendor/hyper/src/ffi/task.rs @@ -0,0 +1,549 @@ +use std::ffi::{c_int, c_void}; +use std::future::Future; +use std::pin::Pin; +use std::ptr; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, Weak, +}; +use std::task::{Context, Poll}; + +use futures_util::stream::{FuturesUnordered, Stream}; + +use super::error::hyper_code; +use super::UserDataPointer; + +type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>; +type BoxAny = Box<dyn AsTaskType + Send + Sync>; + +/// Return in a poll function to indicate it was ready. +pub const HYPER_POLL_READY: c_int = 0; +/// Return in a poll function to indicate it is still pending. +/// +/// The passed in `hyper_waker` should be registered to wake up the task at +/// some later point. +pub const HYPER_POLL_PENDING: c_int = 1; +/// Return in a poll function indicate an error. +pub const HYPER_POLL_ERROR: c_int = 3; + +/// A task executor for `hyper_task`s. +/// +/// A task is a unit of work that may be blocked on IO, and can be polled to +/// make progress on that work. +/// +/// An executor can hold many tasks, included from unrelated HTTP connections. +/// An executor is single threaded. Typically you might have one executor per +/// thread. Or, for simplicity, you may choose one executor per connection. +/// +/// Progress on tasks happens only when `hyper_executor_poll` is called, and only +/// on tasks whose corresponding `hyper_waker` has been called to indicate they +/// are ready to make progress (for instance, because the OS has indicated there +/// is more data to read or more buffer space available to write). +/// +/// Deadlock potential: `hyper_executor_poll` must not be called from within a task's +/// callback. Doing so will result in a deadlock. +/// +/// Methods: +/// +/// - hyper_executor_new: Creates a new task executor. +/// - hyper_executor_push: Push a task onto the executor. +/// - hyper_executor_poll: Polls the executor, trying to make progress on any tasks that have notified that they are ready again. +/// - hyper_executor_free: Frees an executor and any incomplete tasks still part of it. +pub struct hyper_executor { + /// The executor of all task futures. + /// + /// There should never be contention on the mutex, as it is only locked + /// to drive the futures. However, we cannot guarantee proper usage from + /// `hyper_executor_poll()`, which in C could potentially be called inside + /// one of the stored futures. The mutex isn't re-entrant, so doing so + /// would result in a deadlock, but that's better than data corruption. + driver: Mutex<FuturesUnordered<TaskFuture>>, + + /// The queue of futures that need to be pushed into the `driver`. + /// + /// This is has a separate mutex since `spawn` could be called from inside + /// a future, which would mean the driver's mutex is already locked. + spawn_queue: Mutex<Vec<TaskFuture>>, + + /// This is used to track when a future calls `wake` while we are within + /// `hyper_executor::poll_next`. + is_woken: Arc<ExecWaker>, +} + +#[derive(Clone)] +pub(crate) struct WeakExec(Weak<hyper_executor>); + +struct ExecWaker(AtomicBool); + +/// An async task. +/// +/// A task represents a chunk of work that will eventually yield exactly one +/// `hyper_task_value`. Tasks are pushed onto an executor, and that executor is +/// responsible for calling the necessary private functions on the task to make +/// progress. In most cases those private functions will eventually cause read +/// or write callbacks on a `hyper_io` object to be called. +/// +/// Tasks are created by various functions: +/// +/// - hyper_clientconn_handshake: Creates an HTTP client handshake task. +/// - hyper_clientconn_send: Creates a task to send a request on the client connection. +/// - hyper_body_data: Creates a task that will poll a response body for the next buffer of data. +/// - hyper_body_foreach: Creates a task to execute the callback with each body chunk received. +/// +/// Tasks then have a userdata associated with them using `hyper_task_set_userdata``. This +/// is important, for instance, to associate a request id with a given request. When multiple +/// tasks are running on the same executor, this allows distinguishing tasks for different +/// requests. +/// +/// Tasks are then pushed onto an executor, and eventually yielded from hyper_executor_poll: +/// +/// - hyper_executor_push: Push a task onto the executor. +/// - hyper_executor_poll: Polls the executor, trying to make progress on any tasks that have notified that they are ready again. +/// +/// Once a task is yielded from poll, retrieve its userdata, check its type, +/// and extract its value. This will require a case from void* to the appropriate type. +/// +/// Methods on hyper_task: +/// +/// - hyper_task_type: Query the return type of this task. +/// - hyper_task_value: Takes the output value of this task. +/// - hyper_task_set_userdata: Set a user data pointer to be associated with this task. +/// - hyper_task_userdata: Retrieve the userdata that has been set via hyper_task_set_userdata. +/// - hyper_task_free: Free a task. +pub struct hyper_task { + future: BoxFuture<BoxAny>, + output: Option<BoxAny>, + userdata: UserDataPointer, +} + +struct TaskFuture { + task: Option<Box<hyper_task>>, +} + +/// An async context for a task that contains the related waker. +/// +/// This is provided to `hyper_io`'s read and write callbacks. Currently +/// its only purpose is to provide access to the waker. See `hyper_waker`. +/// +/// Corresponding Rust type: <https://doc.rust-lang.org/std/task/struct.Context.html> +pub struct hyper_context<'a>(Context<'a>); + +/// A waker that is saved and used to waken a pending task. +/// +/// This is provided to `hyper_io`'s read and write callbacks via `hyper_context` +/// and `hyper_context_waker`. +/// +/// When nonblocking I/O in one of those callbacks can't make progress (returns +/// `EAGAIN` or `EWOULDBLOCK`), the callback has to return to avoid blocking the +/// executor. But it also has to arrange to get called in the future when more +/// data is available. That's the role of the async context and the waker. The +/// waker can be used to tell the executor "this task is ready to make progress." +/// +/// The read or write callback, upon finding it can't make progress, must get a +/// waker from the context (`hyper_context_waker`), arrange for that waker to be +/// called in the future, and then return `HYPER_POLL_PENDING`. +/// +/// The arrangements for the waker to be called in the future are up to the +/// application, but usually it will involve one big `select(2)` loop that checks which +/// FDs are ready, and a correspondence between FDs and waker objects. For each +/// FD that is ready, the corresponding waker must be called. Then `hyper_executor_poll` +/// must be called. That will cause the executor to attempt to make progress on each +/// woken task. +/// +/// Corresponding Rust type: <https://doc.rust-lang.org/std/task/struct.Waker.html> +pub struct hyper_waker { + waker: std::task::Waker, +} + +/// A descriptor for what type a `hyper_task` value is. +#[repr(C)] +pub enum hyper_task_return_type { + /// The value of this task is null (does not imply an error). + HYPER_TASK_EMPTY, + /// The value of this task is `hyper_error *`. + HYPER_TASK_ERROR, + /// The value of this task is `hyper_clientconn *`. + HYPER_TASK_CLIENTCONN, + /// The value of this task is `hyper_response *`. + HYPER_TASK_RESPONSE, + /// The value of this task is `hyper_buf *`. + HYPER_TASK_BUF, +} + +pub(crate) unsafe trait AsTaskType { + fn as_task_type(&self) -> hyper_task_return_type; +} + +pub(crate) trait IntoDynTaskType { + fn into_dyn_task_type(self) -> BoxAny; +} + +// ===== impl hyper_executor ===== + +impl hyper_executor { + fn new() -> Arc<hyper_executor> { + Arc::new(hyper_executor { + driver: Mutex::new(FuturesUnordered::new()), + spawn_queue: Mutex::new(Vec::new()), + is_woken: Arc::new(ExecWaker(AtomicBool::new(false))), + }) + } + + pub(crate) fn downgrade(exec: &Arc<hyper_executor>) -> WeakExec { + WeakExec(Arc::downgrade(exec)) + } + + fn spawn(&self, task: Box<hyper_task>) { + self.spawn_queue + .lock() + .unwrap() + .push(TaskFuture { task: Some(task) }); + } + + fn poll_next(&self) -> Option<Box<hyper_task>> { + // Drain the queue first. + self.drain_queue(); + + let waker = futures_util::task::waker_ref(&self.is_woken); + let mut cx = Context::from_waker(&waker); + + loop { + { + // Scope the lock on the driver to ensure it is dropped before + // calling drain_queue below. + let mut driver = self.driver.lock().unwrap(); + match Pin::new(&mut *driver).poll_next(&mut cx) { + Poll::Ready(val) => return val, + Poll::Pending => {} + }; + } + + // poll_next returned Pending. + // Check if any of the pending tasks tried to spawn + // some new tasks. If so, drain into the driver and loop. + if self.drain_queue() { + continue; + } + + // If the driver called `wake` while we were polling, + // we should poll again immediately! + if self.is_woken.0.swap(false, Ordering::SeqCst) { + continue; + } + + return None; + } + } + + /// drain_queue locks both self.spawn_queue and self.driver, so it requires + /// that neither of them be locked already. + fn drain_queue(&self) -> bool { + let mut queue = self.spawn_queue.lock().unwrap(); + if queue.is_empty() { + return false; + } + + let driver = self.driver.lock().unwrap(); + + for task in queue.drain(..) { + driver.push(task); + } + + true + } +} + +impl futures_util::task::ArcWake for ExecWaker { + fn wake_by_ref(me: &Arc<ExecWaker>) { + me.0.store(true, Ordering::SeqCst); + } +} + +// ===== impl WeakExec ===== + +impl WeakExec { + pub(crate) fn new() -> Self { + WeakExec(Weak::new()) + } +} + +impl<F> crate::rt::Executor<F> for WeakExec +where + F: Future + Send + 'static, + F::Output: Send + Sync + AsTaskType, +{ + fn execute(&self, fut: F) { + if let Some(exec) = self.0.upgrade() { + exec.spawn(hyper_task::boxed(fut)); + } + } +} + +ffi_fn! { + /// Creates a new task executor. + /// + /// To avoid a memory leak, the executor must eventually be consumed by + /// `hyper_executor_free`. + fn hyper_executor_new() -> *const hyper_executor { + Arc::into_raw(hyper_executor::new()) + } ?= ptr::null() +} + +ffi_fn! { + /// Frees an executor and any incomplete tasks still part of it. + /// + /// This should be used for any executor once it is no longer needed. + fn hyper_executor_free(exec: *const hyper_executor) { + drop(non_null!(Arc::from_raw(exec) ?= ())); + } +} + +ffi_fn! { + /// Push a task onto the executor. + /// + /// The executor takes ownership of the task, which must not be accessed + /// again. + /// + /// Ownership of the task will eventually be returned to the user from + /// `hyper_executor_poll`. + /// + /// To distinguish multiple tasks running on the same executor, use + /// hyper_task_set_userdata. + fn hyper_executor_push(exec: *const hyper_executor, task: *mut hyper_task) -> hyper_code { + let exec = non_null!(&*exec ?= hyper_code::HYPERE_INVALID_ARG); + let task = non_null!(Box::from_raw(task) ?= hyper_code::HYPERE_INVALID_ARG); + exec.spawn(task); + hyper_code::HYPERE_OK + } +} + +ffi_fn! { + /// Polls the executor, trying to make progress on any tasks that can do so. + /// + /// If any task from the executor is ready, returns one of them. The way + /// tasks signal being finished is internal to Hyper. The order in which tasks + /// are returned is not guaranteed. Use userdata to distinguish between tasks. + /// + /// To avoid a memory leak, the task must eventually be consumed by + /// `hyper_task_free`. + /// + /// If there are no ready tasks, this returns `NULL`. + fn hyper_executor_poll(exec: *const hyper_executor) -> *mut hyper_task { + let exec = non_null!(&*exec ?= ptr::null_mut()); + match exec.poll_next() { + Some(task) => Box::into_raw(task), + None => ptr::null_mut(), + } + } ?= ptr::null_mut() +} + +// ===== impl hyper_task ===== + +impl hyper_task { + pub(crate) fn boxed<F>(fut: F) -> Box<hyper_task> + where + F: Future + Send + 'static, + F::Output: IntoDynTaskType + Send + Sync + 'static, + { + Box::new(hyper_task { + future: Box::pin(async move { fut.await.into_dyn_task_type() }), + output: None, + userdata: UserDataPointer(ptr::null_mut()), + }) + } + + fn output_type(&self) -> hyper_task_return_type { + match self.output { + None => hyper_task_return_type::HYPER_TASK_EMPTY, + Some(ref val) => val.as_task_type(), + } + } +} + +impl Future for TaskFuture { + type Output = Box<hyper_task>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match Pin::new(&mut self.task.as_mut().unwrap().future).poll(cx) { + Poll::Ready(val) => { + let mut task = self.task.take().unwrap(); + task.output = Some(val); + Poll::Ready(task) + } + Poll::Pending => Poll::Pending, + } + } +} + +ffi_fn! { + /// Free a task. + /// + /// This should only be used if the task isn't consumed by + /// `hyper_clientconn_handshake` or taken ownership of by + /// `hyper_executor_push`. + fn hyper_task_free(task: *mut hyper_task) { + drop(non_null!(Box::from_raw(task) ?= ())); + } +} + +ffi_fn! { + /// Takes the output value of this task. + /// + /// This must only be called once polling the task on an executor has finished + /// this task. + /// + /// Use `hyper_task_type` to determine the type of the `void *` return value. + /// + /// To avoid a memory leak, a non-empty return value must eventually be + /// consumed by a function appropriate for its type, one of + /// `hyper_error_free`, `hyper_clientconn_free`, `hyper_response_free`, or + /// `hyper_buf_free`. + fn hyper_task_value(task: *mut hyper_task) -> *mut c_void { + let task = non_null!(&mut *task ?= ptr::null_mut()); + + if let Some(val) = task.output.take() { + let p = Box::into_raw(val) as *mut c_void; + // protect from returning fake pointers to empty types + if p == std::ptr::NonNull::<c_void>::dangling().as_ptr() { + ptr::null_mut() + } else { + p + } + } else { + ptr::null_mut() + } + } ?= ptr::null_mut() +} + +ffi_fn! { + /// Query the return type of this task. + fn hyper_task_type(task: *mut hyper_task) -> hyper_task_return_type { + // instead of blowing up spectacularly, just say this null task + // doesn't have a value to retrieve. + non_null!(&*task ?= hyper_task_return_type::HYPER_TASK_EMPTY).output_type() + } +} + +ffi_fn! { + /// Set a user data pointer to be associated with this task. + /// + /// This value will be passed to task callbacks, and can be checked later + /// with `hyper_task_userdata`. + /// + /// This is useful for telling apart tasks for different requests that are + /// running on the same executor. + fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void) { + if task.is_null() { + return; + } + + unsafe { (*task).userdata = UserDataPointer(userdata) }; + } +} + +ffi_fn! { + /// Retrieve the userdata that has been set via `hyper_task_set_userdata`. + fn hyper_task_userdata(task: *mut hyper_task) -> *mut c_void { + non_null!(&*task ?= ptr::null_mut()).userdata.0 + } ?= ptr::null_mut() +} + +// ===== impl AsTaskType ===== + +unsafe impl AsTaskType for () { + fn as_task_type(&self) -> hyper_task_return_type { + hyper_task_return_type::HYPER_TASK_EMPTY + } +} + +unsafe impl AsTaskType for crate::Error { + fn as_task_type(&self) -> hyper_task_return_type { + hyper_task_return_type::HYPER_TASK_ERROR + } +} + +impl<T> IntoDynTaskType for T +where + T: AsTaskType + Send + Sync + 'static, +{ + fn into_dyn_task_type(self) -> BoxAny { + Box::new(self) + } +} + +impl<T> IntoDynTaskType for crate::Result<T> +where + T: IntoDynTaskType + Send + Sync + 'static, +{ + fn into_dyn_task_type(self) -> BoxAny { + match self { + Ok(val) => val.into_dyn_task_type(), + Err(err) => Box::new(err), + } + } +} + +impl<T> IntoDynTaskType for Option<T> +where + T: IntoDynTaskType + Send + Sync + 'static, +{ + fn into_dyn_task_type(self) -> BoxAny { + match self { + Some(val) => val.into_dyn_task_type(), + None => ().into_dyn_task_type(), + } + } +} + +// ===== impl hyper_context ===== + +impl hyper_context<'_> { + pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> { + // A struct with only one field has the same layout as that field. + unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) } + } +} + +ffi_fn! { + /// Creates a waker associated with the task context. + /// + /// The waker can be used to inform the task's executor that the task is + /// ready to make progress (using `hyper_waker_wake``). + /// + /// Typically this only needs to be called once, but it can be called + /// multiple times, returning a new waker each time. + /// + /// To avoid a memory leak, the waker must eventually be consumed by + /// `hyper_waker_free` or `hyper_waker_wake`. + fn hyper_context_waker(cx: *mut hyper_context<'_>) -> *mut hyper_waker { + let waker = non_null!(&mut *cx ?= ptr::null_mut()).0.waker().clone(); + Box::into_raw(Box::new(hyper_waker { waker })) + } ?= ptr::null_mut() +} + +// ===== impl hyper_waker ===== + +ffi_fn! { + /// Free a waker. + /// + /// This should only be used if the request isn't consumed by + /// `hyper_waker_wake`. + fn hyper_waker_free(waker: *mut hyper_waker) { + drop(non_null!(Box::from_raw(waker) ?= ())); + } +} + +ffi_fn! { + /// Wake up the task associated with a waker. + /// + /// This does not do work towards associated task. Instead, it signals + /// to the task's executor that the task is ready to make progress. The + /// application is responsible for calling hyper_executor_poll, which + /// will in turn do work on all tasks that are ready to make progress. + /// + /// NOTE: This consumes the waker. You should not use or free the waker afterwards. + fn hyper_waker_wake(waker: *mut hyper_waker) { + let waker = non_null!(Box::from_raw(waker) ?= ()); + waker.waker.wake(); + } +} |
