#![deny(missing_docs)] #![allow(static_mut_refs)] extern crate std; use std::alloc::{self, Layout}; use std::any::Any; use std::boxed::Box; use std::collections::{hash_map, HashMap}; use std::fmt::{self, Debug, Display}; use std::future::Future; use std::pin::Pin; use std::ptr; use std::string::String; use std::sync::Arc; use std::task::{Context, Poll, Wake, Waker}; use std::vec::Vec; use futures::channel::oneshot; use futures::future::FutureExt; use futures::stream::{FuturesUnordered, StreamExt}; use once_cell::sync::Lazy; mod future_support; mod stream_support; pub use { future_support::{FutureReader, FutureVtable, FutureWriter}, stream_support::{StreamReader, StreamVtable, StreamWriter}, }; pub use futures; type BoxFuture = Pin + 'static>>; /// Represents a task created by either a call to an async-lifted export or a /// future run using `block_on` or `poll_future`. struct FutureState { /// Number of in-progress async-lowered import calls and/or stream/future reads/writes. todo: usize, /// Remaining work to do (if any) before this task can be considered "done". /// /// Note that we won't tell the host the task is done until this is drained /// and `todo` is zero. tasks: Option>, } /// Represents the state of a stream or future. #[doc(hidden)] pub enum Handle { LocalOpen, LocalReady(Box, Waker), LocalWaiting(oneshot::Sender>), LocalClosed, Read, Write, } /// The current task being polled (or null if none). static mut CURRENT: *mut FutureState = ptr::null_mut(); /// Map of any in-progress calls to async-lowered imports, keyed by the /// identifiers issued by the host. static mut CALLS: Lazy>> = Lazy::new(HashMap::new); /// Any newly-deferred work queued by calls to the `spawn` function while /// polling the current task. static mut SPAWNED: Vec = Vec::new(); /// The states of all currently-open streams and futures. static mut HANDLES: Lazy> = Lazy::new(HashMap::new); #[doc(hidden)] pub fn with_entry(handle: u32, fun: impl FnOnce(hash_map::Entry<'_, u32, Handle>) -> T) -> T { fun(unsafe { HANDLES.entry(handle) }) } fn dummy_waker() -> Waker { struct DummyWaker; impl Wake for DummyWaker { fn wake(self: Arc) {} } static WAKER: Lazy> = Lazy::new(|| Arc::new(DummyWaker)); WAKER.clone().into() } /// Poll the specified task until it either completes or can't make immediate /// progress. unsafe fn poll(state: *mut FutureState) -> Poll<()> { loop { if let Some(futures) = (*state).tasks.as_mut() { CURRENT = state; let poll = futures.poll_next_unpin(&mut Context::from_waker(&dummy_waker())); CURRENT = ptr::null_mut(); if SPAWNED.is_empty() { match poll { Poll::Ready(Some(())) => (), Poll::Ready(None) => { (*state).tasks = None; break Poll::Ready(()); } Poll::Pending => break Poll::Pending, } } else { futures.extend(SPAWNED.drain(..)); } } else { break Poll::Ready(()); } } } /// Poll the future generated by a call to an async-lifted export once, calling /// the specified closure (presumably backed by a call to `task.return`) when it /// generates a value. /// /// This will return a non-null pointer representing the task if it hasn't /// completed immediately; otherwise it returns null. #[doc(hidden)] pub fn first_poll( future: impl Future + 'static, fun: impl FnOnce(&T) + 'static, ) -> *mut u8 { let state = Box::into_raw(Box::new(FutureState { todo: 0, tasks: Some( [Box::pin(future.map(|v| fun(&v))) as BoxFuture] .into_iter() .collect(), ), })); match unsafe { poll(state) } { Poll::Ready(()) => ptr::null_mut(), Poll::Pending => state as _, } } /// Await the completion of a call to an async-lowered import. #[doc(hidden)] pub async unsafe fn await_result( import: unsafe extern "C" fn(*mut u8, *mut u8) -> i32, params_layout: Layout, params: *mut u8, results: *mut u8, ) { const STATUS_STARTING: u32 = 0; const STATUS_STARTED: u32 = 1; const STATUS_RETURNED: u32 = 2; const STATUS_DONE: u32 = 3; let result = import(params, results) as u32; let status = result >> 30; let call = (result & !(0b11 << 30)) as i32; if status != STATUS_DONE { assert!(!CURRENT.is_null()); (*CURRENT).todo += 1; } match status { STATUS_STARTING => { let (tx, rx) = oneshot::channel(); CALLS.insert(call, tx); rx.await.unwrap(); alloc::dealloc(params, params_layout); } STATUS_STARTED => { alloc::dealloc(params, params_layout); let (tx, rx) = oneshot::channel(); CALLS.insert(call, tx); rx.await.unwrap(); } STATUS_RETURNED | STATUS_DONE => { alloc::dealloc(params, params_layout); } _ => unreachable!(), } } /// stream/future read/write results defined by the Component Model ABI. mod results { pub const BLOCKED: u32 = 0xffff_ffff; pub const CLOSED: u32 = 0x8000_0000; pub const CANCELED: u32 = 0; } /// Await the completion of a future read or write. #[doc(hidden)] pub async unsafe fn await_future_result( import: unsafe extern "C" fn(u32, *mut u8) -> u32, future: u32, address: *mut u8, ) -> bool { let result = import(future, address); match result { results::BLOCKED => { assert!(!CURRENT.is_null()); (*CURRENT).todo += 1; let (tx, rx) = oneshot::channel(); CALLS.insert(future as _, tx); let v = rx.await.unwrap(); v == 1 } results::CLOSED | results::CANCELED => false, 1 => true, _ => unreachable!(), } } /// Await the completion of a stream read or write. #[doc(hidden)] pub async unsafe fn await_stream_result( import: unsafe extern "C" fn(u32, *mut u8, u32) -> u32, stream: u32, address: *mut u8, count: u32, ) -> Option { let result = import(stream, address, count); match result { results::BLOCKED => { assert!(!CURRENT.is_null()); (*CURRENT).todo += 1; let (tx, rx) = oneshot::channel(); CALLS.insert(stream as _, tx); let v = rx.await.unwrap(); if let results::CLOSED | results::CANCELED = v { None } else { Some(usize::try_from(v).unwrap()) } } results::CLOSED | results::CANCELED => None, v => Some(usize::try_from(v).unwrap()), } } /// Call the `subtask.drop` canonical built-in function. fn subtask_drop(subtask: u32) { #[cfg(not(target_arch = "wasm32"))] { _ = subtask; unreachable!(); } #[cfg(target_arch = "wasm32")] { #[link(wasm_import_module = "$root")] extern "C" { #[link_name = "[subtask-drop]"] fn subtask_drop(_: u32); } unsafe { subtask_drop(subtask); } } } /// Handle a progress notification from the host regarding either a call to an /// async-lowered import or a stream/future read/write operation. #[doc(hidden)] pub unsafe fn callback(ctx: *mut u8, event0: i32, event1: i32, event2: i32) -> i32 { const _EVENT_CALL_STARTING: i32 = 0; const EVENT_CALL_STARTED: i32 = 1; const EVENT_CALL_RETURNED: i32 = 2; const EVENT_CALL_DONE: i32 = 3; const _EVENT_YIELDED: i32 = 4; const EVENT_STREAM_READ: i32 = 5; const EVENT_STREAM_WRITE: i32 = 6; const EVENT_FUTURE_READ: i32 = 7; const EVENT_FUTURE_WRITE: i32 = 8; match event0 { EVENT_CALL_STARTED => 0, EVENT_CALL_RETURNED | EVENT_CALL_DONE | EVENT_STREAM_READ | EVENT_STREAM_WRITE | EVENT_FUTURE_READ | EVENT_FUTURE_WRITE => { if let Some(call) = CALLS.remove(&event1) { _ = call.send(event2 as _); } let state = ctx as *mut FutureState; let done = poll(state).is_ready(); if event0 == EVENT_CALL_DONE { subtask_drop(event1 as u32); } if matches!( event0, EVENT_CALL_DONE | EVENT_STREAM_READ | EVENT_STREAM_WRITE | EVENT_FUTURE_READ | EVENT_FUTURE_WRITE ) { (*state).todo -= 1; } if done && (*state).todo == 0 { drop(Box::from_raw(state)); 1 } else { 0 } } _ => unreachable!(), } } /// Represents the Component Model `error-context` type. pub struct ErrorContext { handle: u32, } impl ErrorContext { #[doc(hidden)] pub fn from_handle(handle: u32) -> Self { Self { handle } } #[doc(hidden)] pub fn handle(&self) -> u32 { self.handle } /// Extract the debug message from a given [`ErrorContext`] pub fn debug_message(&self) -> String { #[cfg(not(target_arch = "wasm32"))] { _ = self; unreachable!(); } #[cfg(target_arch = "wasm32")] { #[link(wasm_import_module = "$root")] extern "C" { #[link_name = "[error-context-debug-message;encoding=utf8;realloc=cabi_realloc]"] fn error_context_debug_message(_: u32, _: *mut u8); } unsafe { let mut ret = [0u32; 2]; error_context_debug_message(self.handle, ret.as_mut_ptr() as *mut _); let len = usize::try_from(ret[1]).unwrap(); String::from_raw_parts(usize::try_from(ret[0]).unwrap() as *mut _, len, len) } } } } impl Debug for ErrorContext { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ErrorContext").finish() } } impl Display for ErrorContext { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Error") } } impl std::error::Error for ErrorContext {} impl Drop for ErrorContext { fn drop(&mut self) { #[cfg(not(target_arch = "wasm32"))] { unreachable!(); } #[cfg(target_arch = "wasm32")] { #[link(wasm_import_module = "$root")] extern "C" { #[link_name = "[error-context-drop]"] fn error_drop(_: u32); } if self.handle != 0 { unsafe { error_drop(self.handle) } } } } } /// Defer the specified future to be run after the current async-lifted export /// task has returned a value. /// /// The task will remain in a running state until all spawned futures have /// completed. pub fn spawn(future: impl Future + 'static) { unsafe { SPAWNED.push(Box::pin(future)) } } fn task_wait(state: &mut FutureState) { #[cfg(not(target_arch = "wasm32"))] { _ = state; unreachable!(); } #[cfg(target_arch = "wasm32")] { #[link(wasm_import_module = "$root")] extern "C" { #[link_name = "[task-wait]"] fn wait(_: *mut i32) -> i32; } let mut payload = [0i32; 2]; unsafe { let event0 = wait(payload.as_mut_ptr()); callback(state as *mut _ as _, event0, payload[0], payload[1]); } } } /// Run the specified future to completion, returning the result. /// /// This uses `task.wait` to poll for progress on any in-progress calls to /// async-lowered imports as necessary. // TODO: refactor so `'static` bounds aren't necessary pub fn block_on(future: impl Future + 'static) -> T { let (tx, mut rx) = oneshot::channel(); let state = &mut FutureState { todo: 0, tasks: Some( [Box::pin(future.map(move |v| drop(tx.send(v)))) as BoxFuture] .into_iter() .collect(), ), }; loop { match unsafe { poll(state) } { Poll::Ready(()) => break rx.try_recv().unwrap().unwrap(), Poll::Pending => task_wait(state), } } } /// Call the `task.yield` canonical built-in function. /// /// This yields control to the host temporarily, allowing other tasks to make /// progress. It's a good idea to call this inside a busy loop which does not /// otherwise ever yield control the the host. pub fn task_yield() { #[cfg(not(target_arch = "wasm32"))] { unreachable!(); } #[cfg(target_arch = "wasm32")] { #[link(wasm_import_module = "$root")] extern "C" { #[link_name = "[task-yield]"] fn yield_(); } unsafe { yield_(); } } } /// Call the `task.backpressure` canonical built-in function. /// /// When `enabled` is `true`, this tells the host to defer any new calls to this /// component instance until further notice (i.e. until `task.backpressure` is /// called again with `enabled` set to `false`). pub fn task_backpressure(enabled: bool) { #[cfg(not(target_arch = "wasm32"))] { _ = enabled; unreachable!(); } #[cfg(target_arch = "wasm32")] { #[link(wasm_import_module = "$root")] extern "C" { #[link_name = "[task-backpressure]"] fn backpressure(_: i32); } unsafe { backpressure(if enabled { 1 } else { 0 }); } } } /// Call the `error-context.new` canonical built-in function. pub fn error_context_new(debug_message: &str) -> ErrorContext { #[cfg(not(target_arch = "wasm32"))] { _ = debug_message; unreachable!(); } #[cfg(target_arch = "wasm32")] { #[link(wasm_import_module = "$root")] extern "C" { #[link_name = "[error-context-new;encoding=utf8]"] fn context_new(_: *const u8, _: usize) -> i32; } unsafe { let handle = context_new(debug_message.as_ptr(), debug_message.len()); // SAFETY: Handles (including error context handles are guaranteed to // fit inside u32 by the Component Model ABI ErrorContext::from_handle(u32::try_from(handle).unwrap()) } } }