Skip to main content

strat9_kernel/sync/
waitqueue.rs

1//! Wait queue and condition variable for blocking/waking tasks.
2//!
3//! ## Overview
4//!
5//! A [`WaitQueue`] holds a FIFO list of [`TaskId`]s waiting for an event.
6//! When a task calls [`WaitQueue::wait`], it is blocked (removed from the
7//! scheduler's ready queue) until another task calls [`WaitQueue::wake_one`]
8//! or [`WaitQueue::wake_all`].
9//!
10//! [`WaitQueue::wait_until`] is the preferred primitive: it atomically checks
11//! a caller-supplied condition before blocking, preventing the classical
12//! lost-wakeup race:
13//!
14//! ```text
15//! Incorrect (racy) pattern:
16//!   if !condition() { queue.wait(); }   ← wakeup between check and wait is lost
17//!
18//! Correct pattern (what wait_until does internally):
19//!   loop {
20//!       hold waiters lock
21//!       check condition → return if true
22//!       push self to waiters (lock still held)
23//!       release waiters lock
24//!       block_current_task()            ← wake_pending flag handles late wakeups
25//!       // woken → re-check condition
26//!   }
27//! ```
28//!
29//! ## Lost-wakeup guarantee
30//!
31//! Even the simple [`WaitQueue::wait`] is safe: the scheduler's `wake_pending`
32//! flag (set by `wake_task()` when the target is not yet in `blocked_tasks`)
33//! ensures that a `wake_one()` that races with the transition to Blocked state
34//! is never silently dropped.
35//!
36//! ## [`WaitCondition`]
37//!
38//! A higher-level wrapper that stores the condition closure alongside the
39//! queue, inspired by Theseus's `wait_condition` crate. Useful when the
40//! same condition is shared across multiple call sites.
41
42use crate::{
43    process::{block_current_task, current_task_id, wake_task, TaskId},
44    sync::{FixedQueue, SpinLock},
45};
46
47const WAITQUEUE_CAPACITY: usize = 256;
48
49//  WaitQueue ========================================
50
51/// A FIFO queue of tasks waiting for an event.
52///
53/// See the [module documentation](self) for usage notes and the lost-wakeup
54/// guarantee.
55pub struct WaitQueue {
56    waiters: SpinLock<FixedQueue<TaskId, WAITQUEUE_CAPACITY>>,
57}
58
59impl WaitQueue {
60    /// Create a new empty wait queue.
61    pub const fn new() -> Self {
62        WaitQueue {
63            waiters: SpinLock::new(FixedQueue::new()),
64        }
65    }
66
67    /// Block the calling task until explicitly woken.
68    ///
69    /// Prefer [`wait_until`](Self::wait_until) when you have a condition to
70    /// test, to avoid spurious-wakeup loops and to benefit from the
71    /// compile-time condition guarantee.
72    ///
73    /// # Panics
74    ///
75    /// Panics if called outside of a task context (no current task).
76    pub fn wait(&self) {
77        let id = current_task_id().expect("WaitQueue::wait called with no current task");
78
79        // Add ourselves to the waiter list *before* blocking.
80        // A concurrent wake_one() that pops our id will call wake_task(id);
81        // the wake_pending flag in the scheduler handles the case where
82        // block_current_task() has not been reached yet.
83        {
84            let mut waiters = self.waiters.lock();
85            waiters
86                .push_back(id)
87                .expect("WaitQueue waiter queue overflow");
88        }
89
90        // Block : returns when wake_task(id) is called (or immediately if
91        // wake_pending was already set by a racing wake_one()).
92        block_current_task();
93    }
94
95    /// Block the calling task until `condition()` returns `Some(T)`.
96    ///
97    /// The condition is checked under the waiters lock, then the task is
98    /// inserted into the queue (still under the lock) before blocking.
99    /// This makes the check-then-block sequence atomic with respect to
100    /// concurrent `wake_one()` / `wake_all()` calls.
101    ///
102    /// The task is re-woken for every notification and re-evaluates the
103    /// condition; spurious wakeups are impossible because the condition is
104    /// always re-checked before the next sleep.
105    ///
106    /// # Panics
107    ///
108    /// Panics if called outside of a task context (no current task).
109    pub fn wait_until<F, T>(&self, mut condition: F) -> T
110    where
111        F: FnMut() -> Option<T>,
112    {
113        let id = current_task_id().expect("WaitQueue::wait_until called with no current task");
114
115        loop {
116            // Hold the waiters lock while evaluating the condition so that a
117            // concurrent wake_one() either:
118            //   (a) finds us in the waiter list and calls wake_task() : which
119            //       sets wake_pending if we haven't blocked yet, or unblocks us
120            //       if we already have, or
121            //   (b) runs before we push ourselves : in which case it won't pop
122            //       us, but the condition will be true on the next evaluation.
123            {
124                let mut waiters = self.waiters.lock();
125
126                if let Some(value) = condition() {
127                    return value;
128                }
129
130                // Condition not yet met: register ourselves as a waiter while
131                // the lock is still held, preventing a wakeup from being missed.
132                waiters
133                    .push_back(id)
134                    .expect("WaitQueue waiter queue overflow");
135            } // waiters lock released here
136
137            // Sleep until woken.  If wake_task() already fired (racing with
138            // the lock drop above), the scheduler's wake_pending flag ensures
139            // block_current_task() returns immediately.
140            block_current_task();
141
142            // Woken : re-evaluate the condition at the top of the loop.
143        }
144    }
145
146    /// Wake the first waiting task (FIFO order).
147    ///
148    /// Returns `true` if a task was successfully woken, `false` if the queue
149    /// was empty.
150    pub fn wake_one(&self) -> bool {
151        let id = {
152            let mut waiters = self.waiters.lock();
153            waiters.pop_front()
154        };
155
156        if let Some(id) = id {
157            wake_task(id)
158        } else {
159            false
160        }
161    }
162
163    /// Wake all waiting tasks.
164    ///
165    /// Returns the number of tasks that were woken.
166    pub fn wake_all(&self) -> usize {
167        let mut count = 0;
168        loop {
169            let id = {
170                let mut waiters = self.waiters.lock();
171                waiters.pop_front()
172            };
173
174            let Some(id) = id else {
175                break;
176            };
177
178            if wake_task(id) {
179                count += 1;
180            }
181        }
182        count
183    }
184
185    /// Returns the number of tasks currently registered in this queue.
186    pub fn waiter_count(&self) -> usize {
187        self.waiters.lock().len()
188    }
189}
190
191//  WaitCondition ====================
192
193/// A named condition variable backed by a [`WaitQueue`].
194///
195/// Stores a reusable condition closure alongside the queue. Multiple tasks
196/// can wait on the same `WaitCondition`; they are all woken when
197/// [`notify_all`](Self::notify_all) is called, and each re-checks the stored
198/// condition before returning.
199///
200/// Inspired by Theseus OS `wait_condition` crate. Wrap in an `Arc` to share
201/// across tasks.
202///
203/// # Example
204///
205/// ```rust,ignore
206/// static FLAG: AtomicBool = AtomicBool::new(false);
207/// static COND: WaitCondition<_> = WaitCondition::new(|| FLAG.load(Ordering::Acquire));
208///
209/// // Waiter task:
210/// COND.wait();
211///
212/// // Notifier task:
213/// FLAG.store(true, Ordering::Release);
214/// COND.notify_all();
215/// ```
216pub struct WaitCondition<F>
217where
218    F: Fn() -> bool,
219{
220    condition: F,
221    queue: WaitQueue,
222}
223
224impl<F: Fn() -> bool> WaitCondition<F> {
225    /// Create a new `WaitCondition` with the given condition function.
226    pub fn new(condition: F) -> Self {
227        WaitCondition {
228            condition,
229            queue: WaitQueue::new(),
230        }
231    }
232
233    /// Block the current task until the stored condition returns `true`.
234    ///
235    /// Returns immediately (without blocking) if the condition is already true.
236    /// Spurious wakeups are impossible.
237    ///
238    /// # Panics
239    ///
240    /// Panics if called outside of a task context.
241    pub fn wait(&self) {
242        self.queue
243            .wait_until(|| if (self.condition)() { Some(()) } else { None })
244    }
245
246    /// Wake one task waiting on this condition.
247    ///
248    /// The woken task will re-check the condition before returning from
249    /// [`wait`](Self::wait). Returns `true` if a task was woken.
250    pub fn notify_one(&self) -> bool {
251        self.queue.wake_one()
252    }
253
254    /// Wake all tasks waiting on this condition.
255    ///
256    /// Each woken task will re-check the condition independently. Returns the
257    /// number of tasks that were moved back to the ready queue.
258    pub fn notify_all(&self) -> usize {
259        self.queue.wake_all()
260    }
261}