diff options
Diffstat (limited to 'vendor/hyper/src/ffi/task.rs')
| -rw-r--r-- | vendor/hyper/src/ffi/task.rs | 549 |
1 files changed, 0 insertions, 549 deletions
diff --git a/vendor/hyper/src/ffi/task.rs b/vendor/hyper/src/ffi/task.rs deleted file mode 100644 index 5b33d42b..00000000 --- a/vendor/hyper/src/ffi/task.rs +++ /dev/null @@ -1,549 +0,0 @@ -use std::ffi::{c_int, c_void}; -use std::future::Future; -use std::pin::Pin; -use std::ptr; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, Weak, -}; -use std::task::{Context, Poll}; - -use futures_util::stream::{FuturesUnordered, Stream}; - -use super::error::hyper_code; -use super::UserDataPointer; - -type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>; -type BoxAny = Box<dyn AsTaskType + Send + Sync>; - -/// Return in a poll function to indicate it was ready. -pub const HYPER_POLL_READY: c_int = 0; -/// Return in a poll function to indicate it is still pending. -/// -/// The passed in `hyper_waker` should be registered to wake up the task at -/// some later point. -pub const HYPER_POLL_PENDING: c_int = 1; -/// Return in a poll function indicate an error. -pub const HYPER_POLL_ERROR: c_int = 3; - -/// A task executor for `hyper_task`s. -/// -/// A task is a unit of work that may be blocked on IO, and can be polled to -/// make progress on that work. -/// -/// An executor can hold many tasks, included from unrelated HTTP connections. -/// An executor is single threaded. Typically you might have one executor per -/// thread. Or, for simplicity, you may choose one executor per connection. -/// -/// Progress on tasks happens only when `hyper_executor_poll` is called, and only -/// on tasks whose corresponding `hyper_waker` has been called to indicate they -/// are ready to make progress (for instance, because the OS has indicated there -/// is more data to read or more buffer space available to write). -/// -/// Deadlock potential: `hyper_executor_poll` must not be called from within a task's -/// callback. Doing so will result in a deadlock. -/// -/// Methods: -/// -/// - hyper_executor_new: Creates a new task executor. -/// - hyper_executor_push: Push a task onto the executor. -/// - hyper_executor_poll: Polls the executor, trying to make progress on any tasks that have notified that they are ready again. -/// - hyper_executor_free: Frees an executor and any incomplete tasks still part of it. -pub struct hyper_executor { - /// The executor of all task futures. - /// - /// There should never be contention on the mutex, as it is only locked - /// to drive the futures. However, we cannot guarantee proper usage from - /// `hyper_executor_poll()`, which in C could potentially be called inside - /// one of the stored futures. The mutex isn't re-entrant, so doing so - /// would result in a deadlock, but that's better than data corruption. - driver: Mutex<FuturesUnordered<TaskFuture>>, - - /// The queue of futures that need to be pushed into the `driver`. - /// - /// This is has a separate mutex since `spawn` could be called from inside - /// a future, which would mean the driver's mutex is already locked. - spawn_queue: Mutex<Vec<TaskFuture>>, - - /// This is used to track when a future calls `wake` while we are within - /// `hyper_executor::poll_next`. - is_woken: Arc<ExecWaker>, -} - -#[derive(Clone)] -pub(crate) struct WeakExec(Weak<hyper_executor>); - -struct ExecWaker(AtomicBool); - -/// An async task. -/// -/// A task represents a chunk of work that will eventually yield exactly one -/// `hyper_task_value`. Tasks are pushed onto an executor, and that executor is -/// responsible for calling the necessary private functions on the task to make -/// progress. In most cases those private functions will eventually cause read -/// or write callbacks on a `hyper_io` object to be called. -/// -/// Tasks are created by various functions: -/// -/// - hyper_clientconn_handshake: Creates an HTTP client handshake task. -/// - hyper_clientconn_send: Creates a task to send a request on the client connection. -/// - hyper_body_data: Creates a task that will poll a response body for the next buffer of data. -/// - hyper_body_foreach: Creates a task to execute the callback with each body chunk received. -/// -/// Tasks then have a userdata associated with them using `hyper_task_set_userdata``. This -/// is important, for instance, to associate a request id with a given request. When multiple -/// tasks are running on the same executor, this allows distinguishing tasks for different -/// requests. -/// -/// Tasks are then pushed onto an executor, and eventually yielded from hyper_executor_poll: -/// -/// - hyper_executor_push: Push a task onto the executor. -/// - hyper_executor_poll: Polls the executor, trying to make progress on any tasks that have notified that they are ready again. -/// -/// Once a task is yielded from poll, retrieve its userdata, check its type, -/// and extract its value. This will require a case from void* to the appropriate type. -/// -/// Methods on hyper_task: -/// -/// - hyper_task_type: Query the return type of this task. -/// - hyper_task_value: Takes the output value of this task. -/// - hyper_task_set_userdata: Set a user data pointer to be associated with this task. -/// - hyper_task_userdata: Retrieve the userdata that has been set via hyper_task_set_userdata. -/// - hyper_task_free: Free a task. -pub struct hyper_task { - future: BoxFuture<BoxAny>, - output: Option<BoxAny>, - userdata: UserDataPointer, -} - -struct TaskFuture { - task: Option<Box<hyper_task>>, -} - -/// An async context for a task that contains the related waker. -/// -/// This is provided to `hyper_io`'s read and write callbacks. Currently -/// its only purpose is to provide access to the waker. See `hyper_waker`. -/// -/// Corresponding Rust type: <https://doc.rust-lang.org/std/task/struct.Context.html> -pub struct hyper_context<'a>(Context<'a>); - -/// A waker that is saved and used to waken a pending task. -/// -/// This is provided to `hyper_io`'s read and write callbacks via `hyper_context` -/// and `hyper_context_waker`. -/// -/// When nonblocking I/O in one of those callbacks can't make progress (returns -/// `EAGAIN` or `EWOULDBLOCK`), the callback has to return to avoid blocking the -/// executor. But it also has to arrange to get called in the future when more -/// data is available. That's the role of the async context and the waker. The -/// waker can be used to tell the executor "this task is ready to make progress." -/// -/// The read or write callback, upon finding it can't make progress, must get a -/// waker from the context (`hyper_context_waker`), arrange for that waker to be -/// called in the future, and then return `HYPER_POLL_PENDING`. -/// -/// The arrangements for the waker to be called in the future are up to the -/// application, but usually it will involve one big `select(2)` loop that checks which -/// FDs are ready, and a correspondence between FDs and waker objects. For each -/// FD that is ready, the corresponding waker must be called. Then `hyper_executor_poll` -/// must be called. That will cause the executor to attempt to make progress on each -/// woken task. -/// -/// Corresponding Rust type: <https://doc.rust-lang.org/std/task/struct.Waker.html> -pub struct hyper_waker { - waker: std::task::Waker, -} - -/// A descriptor for what type a `hyper_task` value is. -#[repr(C)] -pub enum hyper_task_return_type { - /// The value of this task is null (does not imply an error). - HYPER_TASK_EMPTY, - /// The value of this task is `hyper_error *`. - HYPER_TASK_ERROR, - /// The value of this task is `hyper_clientconn *`. - HYPER_TASK_CLIENTCONN, - /// The value of this task is `hyper_response *`. - HYPER_TASK_RESPONSE, - /// The value of this task is `hyper_buf *`. - HYPER_TASK_BUF, -} - -pub(crate) unsafe trait AsTaskType { - fn as_task_type(&self) -> hyper_task_return_type; -} - -pub(crate) trait IntoDynTaskType { - fn into_dyn_task_type(self) -> BoxAny; -} - -// ===== impl hyper_executor ===== - -impl hyper_executor { - fn new() -> Arc<hyper_executor> { - Arc::new(hyper_executor { - driver: Mutex::new(FuturesUnordered::new()), - spawn_queue: Mutex::new(Vec::new()), - is_woken: Arc::new(ExecWaker(AtomicBool::new(false))), - }) - } - - pub(crate) fn downgrade(exec: &Arc<hyper_executor>) -> WeakExec { - WeakExec(Arc::downgrade(exec)) - } - - fn spawn(&self, task: Box<hyper_task>) { - self.spawn_queue - .lock() - .unwrap() - .push(TaskFuture { task: Some(task) }); - } - - fn poll_next(&self) -> Option<Box<hyper_task>> { - // Drain the queue first. - self.drain_queue(); - - let waker = futures_util::task::waker_ref(&self.is_woken); - let mut cx = Context::from_waker(&waker); - - loop { - { - // Scope the lock on the driver to ensure it is dropped before - // calling drain_queue below. - let mut driver = self.driver.lock().unwrap(); - match Pin::new(&mut *driver).poll_next(&mut cx) { - Poll::Ready(val) => return val, - Poll::Pending => {} - }; - } - - // poll_next returned Pending. - // Check if any of the pending tasks tried to spawn - // some new tasks. If so, drain into the driver and loop. - if self.drain_queue() { - continue; - } - - // If the driver called `wake` while we were polling, - // we should poll again immediately! - if self.is_woken.0.swap(false, Ordering::SeqCst) { - continue; - } - - return None; - } - } - - /// drain_queue locks both self.spawn_queue and self.driver, so it requires - /// that neither of them be locked already. - fn drain_queue(&self) -> bool { - let mut queue = self.spawn_queue.lock().unwrap(); - if queue.is_empty() { - return false; - } - - let driver = self.driver.lock().unwrap(); - - for task in queue.drain(..) { - driver.push(task); - } - - true - } -} - -impl futures_util::task::ArcWake for ExecWaker { - fn wake_by_ref(me: &Arc<ExecWaker>) { - me.0.store(true, Ordering::SeqCst); - } -} - -// ===== impl WeakExec ===== - -impl WeakExec { - pub(crate) fn new() -> Self { - WeakExec(Weak::new()) - } -} - -impl<F> crate::rt::Executor<F> for WeakExec -where - F: Future + Send + 'static, - F::Output: Send + Sync + AsTaskType, -{ - fn execute(&self, fut: F) { - if let Some(exec) = self.0.upgrade() { - exec.spawn(hyper_task::boxed(fut)); - } - } -} - -ffi_fn! { - /// Creates a new task executor. - /// - /// To avoid a memory leak, the executor must eventually be consumed by - /// `hyper_executor_free`. - fn hyper_executor_new() -> *const hyper_executor { - Arc::into_raw(hyper_executor::new()) - } ?= ptr::null() -} - -ffi_fn! { - /// Frees an executor and any incomplete tasks still part of it. - /// - /// This should be used for any executor once it is no longer needed. - fn hyper_executor_free(exec: *const hyper_executor) { - drop(non_null!(Arc::from_raw(exec) ?= ())); - } -} - -ffi_fn! { - /// Push a task onto the executor. - /// - /// The executor takes ownership of the task, which must not be accessed - /// again. - /// - /// Ownership of the task will eventually be returned to the user from - /// `hyper_executor_poll`. - /// - /// To distinguish multiple tasks running on the same executor, use - /// hyper_task_set_userdata. - fn hyper_executor_push(exec: *const hyper_executor, task: *mut hyper_task) -> hyper_code { - let exec = non_null!(&*exec ?= hyper_code::HYPERE_INVALID_ARG); - let task = non_null!(Box::from_raw(task) ?= hyper_code::HYPERE_INVALID_ARG); - exec.spawn(task); - hyper_code::HYPERE_OK - } -} - -ffi_fn! { - /// Polls the executor, trying to make progress on any tasks that can do so. - /// - /// If any task from the executor is ready, returns one of them. The way - /// tasks signal being finished is internal to Hyper. The order in which tasks - /// are returned is not guaranteed. Use userdata to distinguish between tasks. - /// - /// To avoid a memory leak, the task must eventually be consumed by - /// `hyper_task_free`. - /// - /// If there are no ready tasks, this returns `NULL`. - fn hyper_executor_poll(exec: *const hyper_executor) -> *mut hyper_task { - let exec = non_null!(&*exec ?= ptr::null_mut()); - match exec.poll_next() { - Some(task) => Box::into_raw(task), - None => ptr::null_mut(), - } - } ?= ptr::null_mut() -} - -// ===== impl hyper_task ===== - -impl hyper_task { - pub(crate) fn boxed<F>(fut: F) -> Box<hyper_task> - where - F: Future + Send + 'static, - F::Output: IntoDynTaskType + Send + Sync + 'static, - { - Box::new(hyper_task { - future: Box::pin(async move { fut.await.into_dyn_task_type() }), - output: None, - userdata: UserDataPointer(ptr::null_mut()), - }) - } - - fn output_type(&self) -> hyper_task_return_type { - match self.output { - None => hyper_task_return_type::HYPER_TASK_EMPTY, - Some(ref val) => val.as_task_type(), - } - } -} - -impl Future for TaskFuture { - type Output = Box<hyper_task>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match Pin::new(&mut self.task.as_mut().unwrap().future).poll(cx) { - Poll::Ready(val) => { - let mut task = self.task.take().unwrap(); - task.output = Some(val); - Poll::Ready(task) - } - Poll::Pending => Poll::Pending, - } - } -} - -ffi_fn! { - /// Free a task. - /// - /// This should only be used if the task isn't consumed by - /// `hyper_clientconn_handshake` or taken ownership of by - /// `hyper_executor_push`. - fn hyper_task_free(task: *mut hyper_task) { - drop(non_null!(Box::from_raw(task) ?= ())); - } -} - -ffi_fn! { - /// Takes the output value of this task. - /// - /// This must only be called once polling the task on an executor has finished - /// this task. - /// - /// Use `hyper_task_type` to determine the type of the `void *` return value. - /// - /// To avoid a memory leak, a non-empty return value must eventually be - /// consumed by a function appropriate for its type, one of - /// `hyper_error_free`, `hyper_clientconn_free`, `hyper_response_free`, or - /// `hyper_buf_free`. - fn hyper_task_value(task: *mut hyper_task) -> *mut c_void { - let task = non_null!(&mut *task ?= ptr::null_mut()); - - if let Some(val) = task.output.take() { - let p = Box::into_raw(val) as *mut c_void; - // protect from returning fake pointers to empty types - if p == std::ptr::NonNull::<c_void>::dangling().as_ptr() { - ptr::null_mut() - } else { - p - } - } else { - ptr::null_mut() - } - } ?= ptr::null_mut() -} - -ffi_fn! { - /// Query the return type of this task. - fn hyper_task_type(task: *mut hyper_task) -> hyper_task_return_type { - // instead of blowing up spectacularly, just say this null task - // doesn't have a value to retrieve. - non_null!(&*task ?= hyper_task_return_type::HYPER_TASK_EMPTY).output_type() - } -} - -ffi_fn! { - /// Set a user data pointer to be associated with this task. - /// - /// This value will be passed to task callbacks, and can be checked later - /// with `hyper_task_userdata`. - /// - /// This is useful for telling apart tasks for different requests that are - /// running on the same executor. - fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void) { - if task.is_null() { - return; - } - - unsafe { (*task).userdata = UserDataPointer(userdata) }; - } -} - -ffi_fn! { - /// Retrieve the userdata that has been set via `hyper_task_set_userdata`. - fn hyper_task_userdata(task: *mut hyper_task) -> *mut c_void { - non_null!(&*task ?= ptr::null_mut()).userdata.0 - } ?= ptr::null_mut() -} - -// ===== impl AsTaskType ===== - -unsafe impl AsTaskType for () { - fn as_task_type(&self) -> hyper_task_return_type { - hyper_task_return_type::HYPER_TASK_EMPTY - } -} - -unsafe impl AsTaskType for crate::Error { - fn as_task_type(&self) -> hyper_task_return_type { - hyper_task_return_type::HYPER_TASK_ERROR - } -} - -impl<T> IntoDynTaskType for T -where - T: AsTaskType + Send + Sync + 'static, -{ - fn into_dyn_task_type(self) -> BoxAny { - Box::new(self) - } -} - -impl<T> IntoDynTaskType for crate::Result<T> -where - T: IntoDynTaskType + Send + Sync + 'static, -{ - fn into_dyn_task_type(self) -> BoxAny { - match self { - Ok(val) => val.into_dyn_task_type(), - Err(err) => Box::new(err), - } - } -} - -impl<T> IntoDynTaskType for Option<T> -where - T: IntoDynTaskType + Send + Sync + 'static, -{ - fn into_dyn_task_type(self) -> BoxAny { - match self { - Some(val) => val.into_dyn_task_type(), - None => ().into_dyn_task_type(), - } - } -} - -// ===== impl hyper_context ===== - -impl hyper_context<'_> { - pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> { - // A struct with only one field has the same layout as that field. - unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) } - } -} - -ffi_fn! { - /// Creates a waker associated with the task context. - /// - /// The waker can be used to inform the task's executor that the task is - /// ready to make progress (using `hyper_waker_wake``). - /// - /// Typically this only needs to be called once, but it can be called - /// multiple times, returning a new waker each time. - /// - /// To avoid a memory leak, the waker must eventually be consumed by - /// `hyper_waker_free` or `hyper_waker_wake`. - fn hyper_context_waker(cx: *mut hyper_context<'_>) -> *mut hyper_waker { - let waker = non_null!(&mut *cx ?= ptr::null_mut()).0.waker().clone(); - Box::into_raw(Box::new(hyper_waker { waker })) - } ?= ptr::null_mut() -} - -// ===== impl hyper_waker ===== - -ffi_fn! { - /// Free a waker. - /// - /// This should only be used if the request isn't consumed by - /// `hyper_waker_wake`. - fn hyper_waker_free(waker: *mut hyper_waker) { - drop(non_null!(Box::from_raw(waker) ?= ())); - } -} - -ffi_fn! { - /// Wake up the task associated with a waker. - /// - /// This does not do work towards associated task. Instead, it signals - /// to the task's executor that the task is ready to make progress. The - /// application is responsible for calling hyper_executor_poll, which - /// will in turn do work on all tasks that are ready to make progress. - /// - /// NOTE: This consumes the waker. You should not use or free the waker afterwards. - fn hyper_waker_wake(waker: *mut hyper_waker) { - let waker = non_null!(Box::from_raw(waker) ?= ()); - waker.waker.wake(); - } -} |
