-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add 'stream' and 'future' types #405
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -17,6 +17,7 @@ summary of the motivation and animated sketch of the design in action. | |||||
* [Current task](#current-task) | ||||||
* [Subtask and Supertask](#subtask-and-supertask) | ||||||
* [Structured concurrency](#structured-concurrency) | ||||||
* [Streams and Futures](#streams-and-futures) | ||||||
* [Waiting](#waiting) | ||||||
* [Backpressure](#backpressure) | ||||||
* [Returning](#returning) | ||||||
|
@@ -106,8 +107,30 @@ Thus, backpressure combined with the partitioning of low-level state provided | |||||
by the Component Model enables sync and async code to interoperate while | ||||||
preserving the expectations of both. | ||||||
|
||||||
[TODO](#todo): `future` and `stream` types that can be used in function | ||||||
signatures will be added next. | ||||||
In addition to being able to define and call whole functions asynchronously, | ||||||
the `stream` and `future` types can be used in function signatures to pass | ||||||
parameters and results incrementally over time, achieving finer-grained | ||||||
concurrency. Streams and futures are thus not defined to be free-standing | ||||||
resources with their own internal memory buffers (like a traditional channel or | ||||||
pipe) but, rather, more-primitive control-flow mechanisms that synchronize the | ||||||
incremental passing of parameters and results during cross-component calls. | ||||||
Higher-level resources like channels and pipes can then be defined in terms | ||||||
of these lower-level `stream` and `future` primitives, e.g.: | ||||||
```wit | ||||||
resource pipe { | ||||||
constructor(buffer-size: u32); | ||||||
write: func(bytes: stream<u8>) -> result; | ||||||
read: func() -> stream<u8>; | ||||||
} | ||||||
``` | ||||||
but also many other domain-specific concurrent resources like WASI HTTP request | ||||||
and response bodies or WASI blobs. Streams and futures are however high-level | ||||||
enough to be bound automatically to many source languages' built-in concurrency | ||||||
features like futures, promises, streams, generators and iterators, unlike | ||||||
lower-level concurrency primitives (like callbacks or `wasi:[email protected]` | ||||||
`pollable`s). Thus, the Component Model seeks to provide the lowest-level | ||||||
fine-grained concurrency primitives that are high-level and idiomatic enough to | ||||||
enable automatic generation of usable language-integrated bindings. | ||||||
|
||||||
|
||||||
## Concepts | ||||||
|
@@ -180,18 +203,80 @@ invocation of an export by the host. Moreover, at any one point in time, the | |||||
set of tasks active in a linked component graph form a forest of async call | ||||||
trees which e.g., can be visualized using a traditional flamegraph. | ||||||
|
||||||
The Canonical ABI's Python code enforces Structured Concurrency by maintaining | ||||||
a simple per-[`Task`] `num_async_subtasks` counter that traps if not zero when | ||||||
the `Task` finishes. | ||||||
The Canonical ABI's Python code enforces Structured Concurrency by incrementing | ||||||
a per-[`Task`] counter when a `Subtask` is created, decrementing when a | ||||||
`Subtask` is destroyed, and trapping if the counter is not zero when the `Task` | ||||||
attempts to exit. | ||||||
|
||||||
### Streams and Futures | ||||||
|
||||||
Streams and Futures have two "ends": a *readable end* and *writable end*. When | ||||||
*consuming* a `stream` or `future` value as a parameter (of an export call | ||||||
with a `stream` or `future` somewhere in the parameter types) or result (of an | ||||||
import call with a `stream` or `future` somewhere in the result type), the | ||||||
receiver always gets *unique ownership* of the *readable end* of the `stream` | ||||||
or `future`. When *producing* a `stream` or `future` value as a parameter (of | ||||||
an import call) or result (of an export call), the producer can either | ||||||
*transfer ownership* of a readable end it has already received or it can | ||||||
create a fresh writable end (via `stream.new` or `future.new`) and lift this | ||||||
writable end (maintaining ownership of the writable end, but creating a fresh | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Not sure what the best way to word this is, but let's make it clear that lifting a writable end produces a readable end in the receiving component or host. |
||||||
readable end for the receiver). To maintain the invariant that readable ends | ||||||
are unique, a writable end can be lifted at most once, trapping otherwise. | ||||||
|
||||||
Based on this, `stream<T>` and `future<T>` values can be passed between | ||||||
functions as if they were synchronous `list<T>` and `T` values, resp. For | ||||||
example, given `f` and `g` with types: | ||||||
```wit | ||||||
f: func(x: whatever) -> stream<T>; | ||||||
g: func(s: stream<T>) -> stuff; | ||||||
``` | ||||||
`g(f(x))` works as you might hope, concurrently streaming `x` into `f` which | ||||||
concurrently streams its results into `g`. (The addition of [`error`](#TODO) | ||||||
will provide a generic answer to the question of what happens if `f` | ||||||
experiences an error: `f` can close its returned writable stream end with an | ||||||
`error` that will be propagated into `g` which should then propagate the error | ||||||
somehow into `stuff`.) | ||||||
|
||||||
If a component instance *would* receive the readable end of a stream for which | ||||||
it already owns the writable end, the readable end disappears and the existing | ||||||
writable end is received instead (since the guest can now handle the whole | ||||||
stream more efficiently wholly from within guest code). E.g., if the same | ||||||
component instance defined `f` and `g` above, the composition `g(f(x))` would | ||||||
just instruct the guest to stream directly from `f` into `g` without crossing a | ||||||
component boundary or performing any extra copies. Thus, strengthening the | ||||||
previously-mentioned invariant, the readable and writable ends of a stream are | ||||||
unique *and never in the same component*. | ||||||
|
||||||
Given the readable or writable end of a stream, core wasm code can call the | ||||||
imported `stream.read` or `stream.write` canonical built-ins, passing the | ||||||
pointer and length of a linear-memory buffer to write-into or read-from, resp. | ||||||
These built-ins can either return immediately if >0 elements were able to be | ||||||
written or read immediately (without blocking) or return a sentinel "blocked" | ||||||
value indicating that the read or write will execute concurrently. The | ||||||
readable and writable ends of streams and futures each have a well-defined | ||||||
parent `Task` that will receive "progress" events on all child streams/futures | ||||||
that have previously blocked. | ||||||
|
||||||
From a [structured-concurrency](#structured-concurrency) perspective, the | ||||||
readable and writable ends of streams and futures are leaves of the async call | ||||||
tree. Unlike subtasks, the parent of the readable ends of streams and future | ||||||
*can* change over time (when transferred via function call, as mentioned | ||||||
above). However, there is always *some* parent `Task` and this parent `Task` | ||||||
is prevented from orphaning its children using the same reference-counting | ||||||
guard mentioned above for subtasks. | ||||||
|
||||||
### Waiting | ||||||
|
||||||
When a component asynchronously lowers an import, it is explicitly requesting | ||||||
that, if the import blocks, control flow be returned back to the calling task | ||||||
so that it can do something else. Eventually though a task may run out of other | ||||||
so that it can do something else. Similarly, if `stream.read` or `stream.write` | ||||||
would block, they return a "blocked" code so that the caller can continue to | ||||||
make progress on other things. But eventually, a task will run out of other | ||||||
things to do and will need to **wait** for progress on one of the task's | ||||||
subtasks. While a task is waiting, the runtime can switch to other running | ||||||
tasks or start new tasks by invoking exports. | ||||||
subtasks, readable stream ends, writable stream ends, readable future ends or | ||||||
writable future ends, which are collectively called its **waitables**. While a | ||||||
task is waiting on its waitables, the Component Model runtime can switch to | ||||||
other running tasks or start new tasks by invoking exports. | ||||||
|
||||||
The Canonical ABI provides two ways for a task to wait: | ||||||
* The task can call the [`task.wait`] built-in to synchronously wait for | ||||||
|
@@ -234,13 +319,23 @@ the "started" state. | |||||
|
||||||
### Returning | ||||||
|
||||||
The way an async Core WebAssembly function returns its value is by calling | ||||||
[`task.return`], passing the core values that are to be lifted. | ||||||
|
||||||
The main reason to have `task.return` is so that a task can continue execution | ||||||
after returning its value. This is useful for various finalization tasks (such | ||||||
as logging, billing or metrics) that don't need to be on the critical path of | ||||||
returning a value to the caller. | ||||||
The way an async function returns its value is by calling [`task.return`], | ||||||
passing the core values that are to be lifted as *parameters*. Additionally, | ||||||
when the `always-task-return` `canonopt` is set, synchronous functions also | ||||||
return their values by calling `task.return` (as a more expressive and | ||||||
general alternative to `post-return`). | ||||||
|
||||||
Returning values by calling `task.return` allows a task to continue executing | ||||||
even after it has passed its initial results to the caller. This can be useful | ||||||
for various finalization tasks (freeing memory or performing logging, billing | ||||||
or metrics operations) that don't need to be on the critical path of returning | ||||||
a value to the caller, but the major use of executing code after `task.return` | ||||||
is to continue to read and write from streams and futures. For example, a | ||||||
stream transformer function of type `func(in: stream<T>) -> stream<U>` will | ||||||
immediately `task.return` a stream created via `stream.new` and then sit in a | ||||||
loop interleaving `stream.read`s (of the readable end passed for `in`) and | ||||||
`stream.write`s (of the writable end it `stream.new`ed) before exiting the | ||||||
task. | ||||||
|
||||||
A task may not call `task.return` unless it is in the "started" state. Once | ||||||
`task.return` is called, the task is in the "returned" state. A task can only | ||||||
|
@@ -419,21 +514,24 @@ For now, this remains a [TODO](#todo) and validation will reject `async`-lifted | |||||
|
||||||
## TODO | ||||||
|
||||||
Native async support is being proposed in progressive chunks. The following | ||||||
features will be added in future chunks to complete "async" in Preview 3: | ||||||
* `future`/`stream`/`error`: add for use in function types for finer-grained | ||||||
concurrency | ||||||
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is | ||||||
no longer wanted and to please wrap it up promptly | ||||||
* allow "tail-calling" a subtask so that the current wasm instance can be torn | ||||||
down eagerly | ||||||
* `task.index`+`task.wake`: allow tasks in the same instance to wait on and | ||||||
wake each other (async condvar-style) | ||||||
Native async support is being proposed incrementally. The following features | ||||||
will be added in future chunks roughly in the order list to complete the full | ||||||
"async" story: | ||||||
* add `error` type that can be included when closing a stream/future | ||||||
* `nonblocking` function type attribute: allow a function to declare in its | ||||||
type that it will not transitively do anything blocking | ||||||
* define what `async` means for `start` functions (top-level await + background | ||||||
tasks), along with cross-task coordination built-ins | ||||||
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is | ||||||
no longer wanted and to please wrap it up promptly | ||||||
* zero-copy forwarding/splicing and built-in way to "tail-call" a subtask so | ||||||
that the current wasm instance can be torn down eagerly while preserving | ||||||
structured concurrency | ||||||
* some way to say "no more elements are coming for a while" | ||||||
* `recursive` function type attribute: allow a function to be reentered | ||||||
recursively (instead of trapping) | ||||||
* enable `async` `start` functions | ||||||
recursively (instead of trapping) and link inner and outer activations | ||||||
* allow pipelining multiple `stream.read`/`write` calls | ||||||
* allow chaining multiple async calls together ("promise pipelining") | ||||||
* integrate with `shared`: define how to lift and lower functions `async` *and* | ||||||
`shared` | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -202,6 +202,8 @@ defvaltype ::= pvt:<primvaltype> => pvt | |
| 0x6a t?:<valtype>? u?:<valtype>? => (result t? (error u)?) | ||
| 0x69 i:<typeidx> => (own i) | ||
| 0x68 i:<typeidx> => (borrow i) | ||
| 0x66 i:<typeidx> => (stream i) | ||
| 0x65 i:<typeidx> => (future i) | ||
labelvaltype ::= l:<label'> t:<valtype> => l t | ||
case ::= l:<label'> t?:<valtype>? 0x00 => (case l t?) | ||
label' ::= len:<u32> l:<label> => l (if len = |l|) | ||
|
@@ -290,7 +292,19 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift | |
| 0x0a m:<core:memdix> => (canon task.wait (memory m) (core func)) 🔀 | ||
| 0x0b m:<core:memidx> => (canon task.poll (memory m) (core func)) 🔀 | ||
| 0x0c => (canon task.yield (core func)) 🔀 | ||
| 0x0d => (canon subtask.drop (core func)) 🔀 | ||
| 0x0d => (canon waitable.drop (core func)) 🔀 | ||
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀 | ||
| 0x0f => (canon stream.read (core func)) 🔀 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would have expected all the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, would it be too out-of-scope to add |
||
| 0x10 => (canon stream.write (core func)) 🔀 | ||
| 0x11 async?:<async?> => (canon stream.cancel-read async? (core func)) 🔀 | ||
| 0x12 async?:<async?> => (canon stream.cancel-write async? (core func)) 🔀 | ||
| 0x13 t:<typeidx> => (canon future.new t (core func)) 🔀 | ||
| 0x14 => (canon future.read (core func)) 🔀 | ||
| 0x15 => (canon future.write (core func)) 🔀 | ||
| 0x16 async?:<async?> => (canon future.cancel-read async? (core func)) 🔀 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we talked about this earlier and I forgot, but could we consider leaving the cancel functions as post-WASIp3 (or nice-to-have-for-WASIp3) features? |
||
| 0x17 async?:<async?> => (canon future.cancel-write async? (core func)) 🔀 | ||
async? ::= 0x00 => | ||
| 0x01 => async | ||
opts ::= opt*:vec(<canonopt>) => opt* | ||
canonopt ::= 0x00 => string-encoding=utf8 | ||
| 0x01 => string-encoding=utf16 | ||
|
@@ -300,6 +314,7 @@ canonopt ::= 0x00 => string-encod | |
| 0x05 f:<core:funcidx> => (post-return f) | ||
| 0x06 => async 🔀 | ||
| 0x07 f:<core:funcidx> => (callback f) 🔀 | ||
| 0x08 => always-task-return 🔀 | ||
``` | ||
Notes: | ||
* The second `0x00` byte in `canon` stands for the `func` sort and thus the | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't literally have to be a counter, right? An implementation might keep track of the subtasks of a given task as a list or hash table, so we just need to make sure that list or hash table is empty before exiting the task.