Skip to content

Commit

Permalink
Add 'stream' and 'future' types
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Oct 14, 2024
1 parent 42e78f3 commit e89ebdf
Show file tree
Hide file tree
Showing 6 changed files with 2,275 additions and 228 deletions.
152 changes: 125 additions & 27 deletions design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ summary of the motivation and animated sketch of the design in action.
* [Current task](#current-task)
* [Subtask and Supertask](#subtask-and-supertask)
* [Structured concurrency](#structured-concurrency)
* [Streams and Futures](#streams-and-futures)
* [Waiting](#waiting)
* [Backpressure](#backpressure)
* [Returning](#returning)
Expand Down Expand Up @@ -106,8 +107,30 @@ Thus, backpressure combined with the partitioning of low-level state provided
by the Component Model enables sync and async code to interoperate while
preserving the expectations of both.

[TODO](#todo): `future` and `stream` types that can be used in function
signatures will be added next.
In addition to being able to define and call whole functions asynchronously,
the `stream` and `future` types can be used in function signatures to pass
parameters and results incrementally over time, achieving finer-grained
concurrency. Streams and futures are thus not defined to be free-standing
resources with their own internal memory buffers (like a traditional channel or
pipe) but, rather, more-primitive control-flow mechanisms that synchronize the
incremental passing of parameters and results during cross-component calls.
Higher-level resources like channels and pipes can then be defined in terms
of these lower-level `stream` and `future` primitives, e.g.:
```wit
resource pipe {
constructor(buffer-size: u32);
write: func(bytes: stream<u8>) -> result;
read: func() -> stream<u8>;
}
```
but also many other domain-specific concurrent resources like WASI HTTP request
and response bodies or WASI blobs. Streams and futures are however high-level
enough to be bound automatically to many source languages' built-in concurrency
features like futures, promises, streams, generators and iterators, unlike
lower-level concurrency primitives (like callbacks or `wasi:[email protected]`
`pollable`s). Thus, the Component Model seeks to provide the lowest-level
fine-grained concurrency primitives that are high-level and idiomatic enough to
enable automatic generation of usable language-integrated bindings.


## Concepts
Expand Down Expand Up @@ -180,18 +203,80 @@ invocation of an export by the host. Moreover, at any one point in time, the
set of tasks active in a linked component graph form a forest of async call
trees which e.g., can be visualized using a traditional flamegraph.

The Canonical ABI's Python code enforces Structured Concurrency by maintaining
a simple per-[`Task`] `num_async_subtasks` counter that traps if not zero when
the `Task` finishes.
The Canonical ABI's Python code enforces Structured Concurrency by incrementing
a per-[`Task`] counter when a `Subtask` is created, decrementing when a
`Subtask` is destroyed, and trapping if the counter is not zero when the `Task`
attempts to exit.

### Streams and Futures

Streams and Futures have two "ends": a *readable end* and *writable end*. When
*consuming* a `stream` or `future` value as a parameter (of an export call
with a `stream` or `future` somewhere in the parameter types) or result (of an
import call with a `stream` or `future` somewhere in the result type), the
receiver always gets *unique ownership* of the *readable end* of the `stream`
or `future`. When *producing* a `stream` or `future` value as a parameter (of
an import call) or result (of an export call), the producer can either
*transfer ownership* of a readable end it has already received or it can
create a fresh writable end (via `stream.new` or `future.new`) and lift this
writable end (maintaining ownership of the writable end, but creating a fresh
readable end for the receiver). To maintain the invariant that readable ends
are unique, a writable end can be lifted at most once, trapping otherwise.

Based on this, `stream<T>` and `future<T>` values can be passed between
functions as if they were synchronous `list<T>` and `T` values, resp. For
example, given `f` and `g` with types:
```wit
f: func(x: whatever) -> stream<T>;
g: func(s: stream<T>) -> stuff;
```
`g(f(x))` works as you might hope, concurrently streaming `x` into `f` which
concurrently streams its results into `g`. (The addition of [`error`](#TODO)
will provide a generic answer to the question of what happens if `f`
experiences an error: `f` can close its returned writable stream end with an
`error` that will be propagated into `g` which should then propagate the error
somehow into `stuff`.)

If a component instance *would* receive the readable end of a stream for which
it already owns the writable end, the readable end disappears and the existing
writable end is received instead (since the guest can now handle the whole
stream more efficiently wholly from within guest code). E.g., if the same
component instance defined `f` and `g` above, the composition `g(f(x))` would
just instruct the guest to stream directly from `f` into `g` without crossing a
component boundary or performing any extra copies. Thus, strengthening the
previously-mentioned invariant, the readable and writable ends of a stream are
unique *and never in the same component*.

Given the readable or writable end of a stream, core wasm code can call the
imported `stream.read` or `stream.write` canonical built-ins, passing the
pointer and length of a linear-memory buffer to write-into or read-from, resp.
These built-ins can either return immediately if >0 elements were able to be
written or read immediately (without blocking) or return a sentinel "blocked"
value indicating that the read or write will execute concurrently. The
readable and writable ends of streams and futures each have a well-defined
parent `Task` that will receive "progress" events on all child streams/futures
that have previously blocked.

From a [structured-concurrency](#structured-concurrency) perspective, the
readable and writable ends of streams and futures are leaves of the async call
tree. Unlike subtasks, the parent of the readable ends of streams and future
*can* change over time (when transferred via function call, as mentioned
above). However, there is always *some* parent `Task` and this parent `Task`
is prevented from orphaning its children using the same reference-counting
guard mentioned above for subtasks.

### Waiting

When a component asynchronously lowers an import, it is explicitly requesting
that, if the import blocks, control flow be returned back to the calling task
so that it can do something else. Eventually though a task may run out of other
so that it can do something else. Similarly, if `stream.read` or `stream.write`
would block, they return a "blocked" code so that the caller can continue to
make progress on other things. But eventually, a task will run out of other
things to do and will need to **wait** for progress on one of the task's
subtasks. While a task is waiting, the runtime can switch to other running
tasks or start new tasks by invoking exports.
subtasks, readable stream ends, writable stream ends, readable future ends or
writable future ends, which are collectively called its **waitables**. While a
task is waiting on its waitables, the Component Model runtime can switch to
other running tasks or start new tasks by invoking exports.

The Canonical ABI provides two ways for a task to wait:
* The task can call the [`task.wait`] built-in to synchronously wait for
Expand Down Expand Up @@ -234,13 +319,23 @@ the "started" state.

### Returning

The way an async Core WebAssembly function returns its value is by calling
[`task.return`], passing the core values that are to be lifted.

The main reason to have `task.return` is so that a task can continue execution
after returning its value. This is useful for various finalization tasks (such
as logging, billing or metrics) that don't need to be on the critical path of
returning a value to the caller.
The way an async function returns its value is by calling [`task.return`],
passing the core values that are to be lifted as *parameters*. Additionally,
when the `always-task-return` `canonopt` is set, synchronous functions also
return their values by calling `task.return` (as a more expressive and
general alternative to `post-return`).

Returning values by calling `task.return` allows a task to continue executing
even after it has passed its initial results to the caller. This can be useful
for various finalization tasks (freeing memory or performing logging, billing
or metrics operations) that don't need to be on the critical path of returning
a value to the caller, but the major use of executing code after `task.return`
is to continue to read and write from streams and futures. For example, a
stream transformer function of type `func(in: stream<T>) -> stream<U>` will
immediately `task.return` a stream created via `stream.new` and then sit in a
loop interleaving `stream.read`s (of the readable end passed for `in`) and
`stream.write`s (of the writable end it `stream.new`ed) before exiting the
task.

A task may not call `task.return` unless it is in the "started" state. Once
`task.return` is called, the task is in the "returned" state. A task can only
Expand Down Expand Up @@ -419,21 +514,24 @@ For now, this remains a [TODO](#todo) and validation will reject `async`-lifted

## TODO

Native async support is being proposed in progressive chunks. The following
features will be added in future chunks to complete "async" in Preview 3:
* `future`/`stream`/`error`: add for use in function types for finer-grained
concurrency
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is
no longer wanted and to please wrap it up promptly
* allow "tail-calling" a subtask so that the current wasm instance can be torn
down eagerly
* `task.index`+`task.wake`: allow tasks in the same instance to wait on and
wake each other (async condvar-style)
Native async support is being proposed incrementally. The following features
will be added in future chunks roughly in the order list to complete the full
"async" story:
* add `error` type that can be included when closing a stream/future
* `nonblocking` function type attribute: allow a function to declare in its
type that it will not transitively do anything blocking
* define what `async` means for `start` functions (top-level await + background
tasks), along with cross-task coordination built-ins
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is
no longer wanted and to please wrap it up promptly
* zero-copy forwarding/splicing and built-in way to "tail-call" a subtask so
that the current wasm instance can be torn down eagerly while preserving
structured concurrency
* some way to say "no more elements are coming for a while"
* `recursive` function type attribute: allow a function to be reentered
recursively (instead of trapping)
* enable `async` `start` functions
recursively (instead of trapping) and link inner and outer activations
* allow pipelining multiple `stream.read`/`write` calls
* allow chaining multiple async calls together ("promise pipelining")
* integrate with `shared`: define how to lift and lower functions `async` *and*
`shared`

Expand Down
17 changes: 16 additions & 1 deletion design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ defvaltype ::= pvt:<primvaltype> => pvt
| 0x6a t?:<valtype>? u?:<valtype>? => (result t? (error u)?)
| 0x69 i:<typeidx> => (own i)
| 0x68 i:<typeidx> => (borrow i)
| 0x66 i:<typeidx> => (stream i)
| 0x65 i:<typeidx> => (future i)
labelvaltype ::= l:<label'> t:<valtype> => l t
case ::= l:<label'> t?:<valtype>? 0x00 => (case l t?)
label' ::= len:<u32> l:<label> => l (if len = |l|)
Expand Down Expand Up @@ -290,7 +292,19 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x0a m:<core:memdix> => (canon task.wait (memory m) (core func)) 🔀
| 0x0b m:<core:memidx> => (canon task.poll (memory m) (core func)) 🔀
| 0x0c => (canon task.yield (core func)) 🔀
| 0x0d => (canon subtask.drop (core func)) 🔀
| 0x0d => (canon waitable.drop (core func)) 🔀
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
| 0x0f => (canon stream.read (core func)) 🔀
| 0x10 => (canon stream.write (core func)) 🔀
| 0x11 async?:<async?> => (canon stream.cancel-read async? (core func)) 🔀
| 0x12 async?:<async?> => (canon stream.cancel-write async? (core func)) 🔀
| 0x13 t:<typeidx> => (canon future.new t (core func)) 🔀
| 0x14 => (canon future.read (core func)) 🔀
| 0x15 => (canon future.write (core func)) 🔀
| 0x16 async?:<async?> => (canon future.cancel-read async? (core func)) 🔀
| 0x17 async?:<async?> => (canon future.cancel-write async? (core func)) 🔀
async? ::= 0x00 =>
| 0x01 => async
opts ::= opt*:vec(<canonopt>) => opt*
canonopt ::= 0x00 => string-encoding=utf8
| 0x01 => string-encoding=utf16
Expand All @@ -300,6 +314,7 @@ canonopt ::= 0x00 => string-encod
| 0x05 f:<core:funcidx> => (post-return f)
| 0x06 => async 🔀
| 0x07 f:<core:funcidx> => (callback f) 🔀
| 0x08 => always-task-return 🔀
```
Notes:
* The second `0x00` byte in `canon` stands for the `func` sort and thus the
Expand Down
Loading

0 comments on commit e89ebdf

Please sign in to comment.