Skip to main content

strat9_kernel/sync/
waitqueue.rs

1//! Wait queue and condition variable for blocking/waking tasks.
2//!
3//! ## Overview
4//!
5//! A [`WaitQueue`] holds a FIFO list of [`TaskId`]s waiting for an event.
6//! When a task calls [`WaitQueue::wait`], it is blocked (removed from the
7//! scheduler's ready queue) until another task calls [`WaitQueue::wake_one`]
8//! or [`WaitQueue::wake_all`].
9//!
10//! [`WaitQueue::wait_until`] is the preferred primitive: it atomically checks
11//! a caller-supplied condition before blocking, preventing the classical
12//! lost-wakeup race:
13//!
14//! ```text
15//! Incorrect (racy) pattern:
16//!   if !condition() { queue.wait(); }   ← wakeup between check and wait is lost
17//!
18//! Correct pattern (what wait_until does internally):
19//!   loop {
20//!       hold waiters lock
21//!       check condition → return if true
22//!       push self to waiters (lock still held)
23//!       release waiters lock
24//!       block_current_task()            ← wake_pending flag handles late wakeups
25//!       // woken → re-check condition
26//!   }
27//! ```
28//!
29//! ## Lost-wakeup guarantee
30//!
31//! Even the simple [`WaitQueue::wait`] is safe: the scheduler's `wake_pending`
32//! flag (set by `wake_task()` when the target is not yet in `blocked_tasks`)
33//! ensures that a `wake_one()` that races with the transition to Blocked state
34//! is never silently dropped.
35//!
36//! ## [`WaitCondition`]
37//!
38//! A higher-level wrapper that stores the condition closure alongside the
39//! queue, inspired by Theseus's `wait_condition` crate. Useful when the
40//! same condition is shared across multiple call sites.
41
42use crate::{
43    process::{block_current_task, current_task_id, wake_task, TaskId},
44    sync::SpinLock,
45};
46use alloc::collections::VecDeque;
47
48// ── WaitQueue ────────────────────────────────────────────────────────────────
49
50/// A FIFO queue of tasks waiting for an event.
51///
52/// See the [module documentation](self) for usage notes and the lost-wakeup
53/// guarantee.
54pub struct WaitQueue {
55    waiters: SpinLock<VecDeque<TaskId>>,
56}
57
58impl WaitQueue {
59    /// Create a new empty wait queue.
60    pub const fn new() -> Self {
61        WaitQueue {
62            waiters: SpinLock::new(VecDeque::new()),
63        }
64    }
65
66    /// Block the calling task until explicitly woken.
67    ///
68    /// Prefer [`wait_until`](Self::wait_until) when you have a condition to
69    /// test, to avoid spurious-wakeup loops and to benefit from the
70    /// compile-time condition guarantee.
71    ///
72    /// # Panics
73    ///
74    /// Panics if called outside of a task context (no current task).
75    pub fn wait(&self) {
76        let id = current_task_id().expect("WaitQueue::wait called with no current task");
77
78        // Add ourselves to the waiter list *before* blocking.
79        // A concurrent wake_one() that pops our id will call wake_task(id);
80        // the wake_pending flag in the scheduler handles the case where
81        // block_current_task() has not been reached yet.
82        {
83            let mut waiters = self.waiters.lock();
84            waiters.push_back(id);
85        }
86
87        // Block — returns when wake_task(id) is called (or immediately if
88        // wake_pending was already set by a racing wake_one()).
89        block_current_task();
90    }
91
92    /// Block the calling task until `condition()` returns `Some(T)`.
93    ///
94    /// The condition is checked under the waiters lock, then the task is
95    /// inserted into the queue (still under the lock) before blocking.
96    /// This makes the check-then-block sequence atomic with respect to
97    /// concurrent `wake_one()` / `wake_all()` calls.
98    ///
99    /// The task is re-woken for every notification and re-evaluates the
100    /// condition; spurious wakeups are impossible because the condition is
101    /// always re-checked before the next sleep.
102    ///
103    /// # Panics
104    ///
105    /// Panics if called outside of a task context (no current task).
106    pub fn wait_until<F, T>(&self, mut condition: F) -> T
107    where
108        F: FnMut() -> Option<T>,
109    {
110        let id = current_task_id().expect("WaitQueue::wait_until called with no current task");
111
112        loop {
113            // Hold the waiters lock while evaluating the condition so that a
114            // concurrent wake_one() either:
115            //   (a) finds us in the waiter list and calls wake_task() — which
116            //       sets wake_pending if we haven't blocked yet, or unblocks us
117            //       if we already have, or
118            //   (b) runs before we push ourselves — in which case it won't pop
119            //       us, but the condition will be true on the next evaluation.
120            {
121                let mut waiters = self.waiters.lock();
122
123                if let Some(value) = condition() {
124                    return value;
125                }
126
127                // Condition not yet met: register ourselves as a waiter while
128                // the lock is still held, preventing a wakeup from being missed.
129                waiters.push_back(id);
130            } // waiters lock released here
131
132            // Sleep until woken.  If wake_task() already fired (racing with
133            // the lock drop above), the scheduler's wake_pending flag ensures
134            // block_current_task() returns immediately.
135            block_current_task();
136
137            // Woken — re-evaluate the condition at the top of the loop.
138        }
139    }
140
141    /// Wake the first waiting task (FIFO order).
142    ///
143    /// Returns `true` if a task was successfully woken, `false` if the queue
144    /// was empty.
145    pub fn wake_one(&self) -> bool {
146        let id = {
147            let mut waiters = self.waiters.lock();
148            waiters.pop_front()
149        };
150
151        if let Some(id) = id {
152            wake_task(id)
153        } else {
154            false
155        }
156    }
157
158    /// Wake all waiting tasks.
159    ///
160    /// Returns the number of tasks that were woken.
161    pub fn wake_all(&self) -> usize {
162        let ids: VecDeque<TaskId> = {
163            let mut waiters = self.waiters.lock();
164            core::mem::take(&mut *waiters)
165        };
166
167        let mut count = 0;
168        for id in ids {
169            if wake_task(id) {
170                count += 1;
171            }
172        }
173        count
174    }
175
176    /// Returns the number of tasks currently registered in this queue.
177    pub fn waiter_count(&self) -> usize {
178        self.waiters.lock().len()
179    }
180}
181
182// ── WaitCondition ────────────────────────────────────────────────────────────
183
184/// A named condition variable backed by a [`WaitQueue`].
185///
186/// Stores a reusable condition closure alongside the queue. Multiple tasks
187/// can wait on the same `WaitCondition`; they are all woken when
188/// [`notify_all`](Self::notify_all) is called, and each re-checks the stored
189/// condition before returning.
190///
191/// Inspired by Theseus OS `wait_condition` crate. Wrap in an `Arc` to share
192/// across tasks.
193///
194/// # Example
195///
196/// ```rust,ignore
197/// static FLAG: AtomicBool = AtomicBool::new(false);
198/// static COND: WaitCondition<_> = WaitCondition::new(|| FLAG.load(Ordering::Acquire));
199///
200/// // Waiter task:
201/// COND.wait();
202///
203/// // Notifier task:
204/// FLAG.store(true, Ordering::Release);
205/// COND.notify_all();
206/// ```
207pub struct WaitCondition<F>
208where
209    F: Fn() -> bool,
210{
211    condition: F,
212    queue: WaitQueue,
213}
214
215impl<F: Fn() -> bool> WaitCondition<F> {
216    /// Create a new `WaitCondition` with the given condition function.
217    pub fn new(condition: F) -> Self {
218        WaitCondition {
219            condition,
220            queue: WaitQueue::new(),
221        }
222    }
223
224    /// Block the current task until the stored condition returns `true`.
225    ///
226    /// Returns immediately (without blocking) if the condition is already true.
227    /// Spurious wakeups are impossible.
228    ///
229    /// # Panics
230    ///
231    /// Panics if called outside of a task context.
232    pub fn wait(&self) {
233        self.queue
234            .wait_until(|| if (self.condition)() { Some(()) } else { None })
235    }
236
237    /// Wake one task waiting on this condition.
238    ///
239    /// The woken task will re-check the condition before returning from
240    /// [`wait`](Self::wait). Returns `true` if a task was woken.
241    pub fn notify_one(&self) -> bool {
242        self.queue.wake_one()
243    }
244
245    /// Wake all tasks waiting on this condition.
246    ///
247    /// Each woken task will re-check the condition independently. Returns the
248    /// number of tasks that were moved back to the ready queue.
249    pub fn notify_all(&self) -> usize {
250        self.queue.wake_all()
251    }
252}