Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework time-driver contract. #3593

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ci/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cargo test --manifest-path ./embassy-futures/Cargo.toml
cargo test --manifest-path ./embassy-sync/Cargo.toml
cargo test --manifest-path ./embassy-embedded-hal/Cargo.toml
cargo test --manifest-path ./embassy-hal-internal/Cargo.toml
cargo test --manifest-path ./embassy-time/Cargo.toml --features generic-queue,mock-driver
cargo test --manifest-path ./embassy-time/Cargo.toml --features mock-driver
cargo test --manifest-path ./embassy-time-driver/Cargo.toml

cargo test --manifest-path ./embassy-boot/Cargo.toml
Expand Down
4 changes: 3 additions & 1 deletion ci-xtensa.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ cargo batch \
--- build --release --manifest-path embassy-executor/Cargo.toml --target xtensa-esp32-none-elf --features arch-spin,executor-thread \
--- build --release --manifest-path embassy-executor/Cargo.toml --target xtensa-esp32-none-elf --features arch-spin,executor-thread,integrated-timers \
--- build --release --manifest-path embassy-sync/Cargo.toml --target xtensa-esp32s2-none-elf --features defmt \
--- build --release --manifest-path embassy-time/Cargo.toml --target xtensa-esp32s2-none-elf --features defmt,defmt-timestamp-uptime,generic-queue-8,mock-driver \
--- build --release --manifest-path embassy-time/Cargo.toml --target xtensa-esp32s2-none-elf --features defmt,defmt-timestamp-uptime,mock-driver \
--- build --release --manifest-path embassy-time-queue-driver/Cargo.toml --target xtensa-esp32s2-none-elf --features integrated-timers \
--- build --release --manifest-path embassy-time-queue-driver/Cargo.toml --target xtensa-esp32s2-none-elf --features generic-queue-8 \
--- build --release --manifest-path embassy-net/Cargo.toml --target xtensa-esp32-none-elf --features defmt,tcp,udp,dns,proto-ipv4,medium-ethernet,packet-trace \
--- build --release --manifest-path embassy-net/Cargo.toml --target xtensa-esp32-none-elf --features defmt,tcp,udp,dns,proto-ipv4,multicast,medium-ethernet \
--- build --release --manifest-path embassy-net/Cargo.toml --target xtensa-esp32-none-elf --features defmt,tcp,udp,dns,dhcpv4,medium-ethernet \
Expand Down
4 changes: 3 additions & 1 deletion ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ cargo batch \
--- build --release --manifest-path embassy-executor/Cargo.toml --target riscv32imac-unknown-none-elf --features arch-riscv32,executor-thread \
--- build --release --manifest-path embassy-executor/Cargo.toml --target riscv32imac-unknown-none-elf --features arch-riscv32,executor-thread,integrated-timers \
--- build --release --manifest-path embassy-sync/Cargo.toml --target thumbv6m-none-eabi --features defmt \
--- build --release --manifest-path embassy-time/Cargo.toml --target thumbv6m-none-eabi --features defmt,defmt-timestamp-uptime,generic-queue-8,mock-driver \
--- build --release --manifest-path embassy-time/Cargo.toml --target thumbv6m-none-eabi --features defmt,defmt-timestamp-uptime,mock-driver \
--- build --release --manifest-path embassy-time-queue-driver/Cargo.toml --target thumbv6m-none-eabi --features integrated-timers \
--- build --release --manifest-path embassy-time-queue-driver/Cargo.toml --target thumbv6m-none-eabi --features generic-queue-8 \
--- build --release --manifest-path embassy-net/Cargo.toml --target thumbv7em-none-eabi --features defmt,tcp,udp,dns,proto-ipv4,medium-ethernet,packet-trace \
--- build --release --manifest-path embassy-net/Cargo.toml --target thumbv7em-none-eabi --features defmt,tcp,udp,dns,proto-ipv4,multicast,medium-ethernet \
--- build --release --manifest-path embassy-net/Cargo.toml --target thumbv7em-none-eabi --features defmt,tcp,udp,dns,dhcpv4,medium-ethernet \
Expand Down
5 changes: 4 additions & 1 deletion embassy-executor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `raw::Executor` now has an `fn initialize` that must be called once before starting to poll it.
- embassy-executor no longer provides an `embassy-time-queue-driver` implementation
- Added `TaskRef::executor` to obtain a reference to a task's executor
- integrated-timers are no longer processed when polling the executor.
- `raw::timer_queue::TimerQueue` is now public.

## 0.6.3 - 2024-11-12

Expand Down
2 changes: 1 addition & 1 deletion embassy-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ nightly = ["embassy-executor-macros/nightly"]
turbowakers = []

## Use the executor-integrated `embassy-time` timer queue.
integrated-timers = ["dep:embassy-time-driver", "dep:embassy-time-queue-driver"]
integrated-timers = ["dep:embassy-time-driver"]

#! ### Architecture
_arch = [] # some arch was picked
Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/avr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}

init(self.inner.spawner());

loop {
Expand Down
6 changes: 0 additions & 6 deletions embassy-executor/src/arch/cortex_m.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}
init(self.inner.spawner());

loop {
Expand Down Expand Up @@ -210,9 +207,6 @@ mod interrupt {
}

let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
unsafe {
executor.initialize();
}

unsafe { NVIC::unmask(irq) }

Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/riscv32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}

init(self.inner.spawner());

loop {
Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/spin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}

init(self.inner.spawner());

loop {
Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}

init(self.inner.spawner());

loop {
Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ mod thread {
/// - a `static mut` (unsafe)
/// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
pub fn start(&'static mut self, init: impl FnOnce(Spawner)) {
unsafe {
self.inner.initialize();
}

unsafe {
let executor = &self.inner;
let future = Closure::new(move |_| {
Expand Down
177 changes: 67 additions & 110 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod run_queue;
mod state;

#[cfg(feature = "integrated-timers")]
mod timer_queue;
pub mod timer_queue;
#[cfg(feature = "trace")]
mod trace;
pub(crate) mod util;
Expand All @@ -31,9 +31,6 @@ use core::pin::Pin;
use core::ptr::NonNull;
use core::task::{Context, Poll};

#[cfg(feature = "integrated-timers")]
use embassy_time_driver::AlarmHandle;

use self::run_queue::{RunQueue, RunQueueItem};
use self::state::State;
use self::util::{SyncUnsafeCell, UninitCell};
Expand All @@ -47,14 +44,13 @@ pub(crate) struct TaskHeader {
pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,

#[cfg(feature = "integrated-timers")]
pub(crate) expires_at: SyncUnsafeCell<u64>,
/// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
#[cfg(feature = "integrated-timers")]
pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
}

/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, PartialEq)]
pub struct TaskRef {
ptr: NonNull<TaskHeader>,
}
Expand All @@ -76,10 +72,56 @@ impl TaskRef {
}
}

/// # Safety
///
/// The result of this function must only be compared
/// for equality, or stored, but not used.
pub const unsafe fn dangling() -> Self {
Self {
ptr: NonNull::dangling(),
}
}

pub(crate) fn header(self) -> &'static TaskHeader {
unsafe { self.ptr.as_ref() }
}

/// Returns a reference to the executor that the task is currently running on.
#[cfg(feature = "integrated-timers")]
pub unsafe fn executor(self) -> Option<&'static Executor> {
self.header().executor.get().map(|e| Executor::wrap(e))
}

/// Returns a reference to the timer queue item.
#[cfg(feature = "integrated-timers")]
pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem {
&self.header().timer_queue_item
}

/// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before)
///
/// Entering this state prevents the task from being respawned while in a timer queue.
///
/// Safety:
///
/// This functions should only be called by the timer queue implementation, before
/// enqueueing the timer item.
#[cfg(feature = "integrated-timers")]
pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation {
self.header().state.timer_enqueue()
}

/// Unmark the task as timer-queued.
///
/// Safety:
///
/// This functions should only be called by the timer queue implementation, after the task has
/// been removed from the timer queue.
#[cfg(feature = "integrated-timers")]
pub unsafe fn timer_dequeue(&self) {
self.header().state.timer_dequeue()
}

/// The returned pointer is valid for the entire TaskStorage.
pub(crate) fn as_ptr(self) -> *const TaskHeader {
self.ptr.as_ptr()
Expand Down Expand Up @@ -120,8 +162,6 @@ impl<F: Future + 'static> TaskStorage<F> {
// Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
poll_fn: SyncUnsafeCell::new(None),

#[cfg(feature = "integrated-timers")]
expires_at: SyncUnsafeCell::new(0),
#[cfg(feature = "integrated-timers")]
timer_queue_item: timer_queue::TimerQueueItem::new(),
},
Expand Down Expand Up @@ -160,9 +200,6 @@ impl<F: Future + 'static> TaskStorage<F> {
Poll::Ready(_) => {
this.future.drop_in_place();
this.raw.state.despawn();

#[cfg(feature = "integrated-timers")]
this.raw.expires_at.set(u64::MAX);
}
Poll::Pending => {}
}
Expand Down Expand Up @@ -316,34 +353,16 @@ impl Pender {
pub(crate) struct SyncExecutor {
run_queue: RunQueue,
pender: Pender,

#[cfg(feature = "integrated-timers")]
pub(crate) timer_queue: timer_queue::TimerQueue,
#[cfg(feature = "integrated-timers")]
alarm: AlarmHandle,
}

impl SyncExecutor {
pub(crate) fn new(pender: Pender) -> Self {
#[cfg(feature = "integrated-timers")]
let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };

Self {
run_queue: RunQueue::new(),
pender,

#[cfg(feature = "integrated-timers")]
timer_queue: timer_queue::TimerQueue::new(),
#[cfg(feature = "integrated-timers")]
alarm,
}
}

pub(crate) unsafe fn initialize(&'static self) {
#[cfg(feature = "integrated-timers")]
embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
}

/// Enqueue a task in the task queue
///
/// # Safety
Expand All @@ -360,12 +379,6 @@ impl SyncExecutor {
}
}

#[cfg(feature = "integrated-timers")]
fn alarm_callback(ctx: *mut ()) {
let this: &Self = unsafe { &*(ctx as *const Self) };
this.pender.pend();
}

pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
task.header().executor.set(Some(self));

Expand All @@ -379,56 +392,27 @@ impl SyncExecutor {
///
/// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
pub(crate) unsafe fn poll(&'static self) {
#[allow(clippy::never_loop)]
loop {
#[cfg(feature = "integrated-timers")]
self.timer_queue
.dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);

self.run_queue.dequeue_all(|p| {
let task = p.header();

#[cfg(feature = "integrated-timers")]
task.expires_at.set(u64::MAX);

if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}

#[cfg(feature = "trace")]
trace::task_exec_begin(self, &p);
self.run_queue.dequeue_all(|p| {
let task = p.header();

if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}

// Run the task
task.poll_fn.get().unwrap_unchecked()(p);
#[cfg(feature = "trace")]
trace::task_exec_begin(self, &p);

#[cfg(feature = "trace")]
trace::task_exec_end(self, &p);
// Run the task
task.poll_fn.get().unwrap_unchecked()(p);

// Enqueue or update into timer_queue
#[cfg(feature = "integrated-timers")]
self.timer_queue.update(p);
});

#[cfg(feature = "integrated-timers")]
{
// If this is already in the past, set_alarm might return false
// In that case do another poll loop iteration.
let next_expiration = self.timer_queue.next_expiration();
if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
break;
}
}

#[cfg(not(feature = "integrated-timers"))]
{
break;
}
}
#[cfg(feature = "trace")]
trace::task_exec_end(self, &p);
});

#[cfg(feature = "trace")]
trace::executor_idle(self)
Expand Down Expand Up @@ -494,15 +478,6 @@ impl Executor {
}
}

/// Initializes the executor.
///
/// # Safety
///
/// This function must be called once before any other method is called.
pub unsafe fn initialize(&'static self) {
self.inner.initialize();
}

/// Spawn a task in this executor.
///
/// # Safety
Expand Down Expand Up @@ -575,21 +550,3 @@ pub fn wake_task_no_pend(task: TaskRef) {
}
}
}

#[cfg(feature = "integrated-timers")]
struct TimerQueue;

#[cfg(feature = "integrated-timers")]
impl embassy_time_queue_driver::TimerQueue for TimerQueue {
fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
let task = waker::task_from_waker(waker);
let task = task.header();
unsafe {
let expires_at = task.expires_at.get();
task.expires_at.set(expires_at.min(at));
}
}
}

#[cfg(feature = "integrated-timers")]
embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);
Loading