strat9_kernel/sync/waitqueue.rs
1//! Wait queue and condition variable for blocking/waking tasks.
2//!
3//! ## Overview
4//!
5//! A [`WaitQueue`] holds a FIFO list of [`TaskId`]s waiting for an event.
6//! When a task calls [`WaitQueue::wait`], it is blocked (removed from the
7//! scheduler's ready queue) until another task calls [`WaitQueue::wake_one`]
8//! or [`WaitQueue::wake_all`].
9//!
10//! [`WaitQueue::wait_until`] is the preferred primitive: it atomically checks
11//! a caller-supplied condition before blocking, preventing the classical
12//! lost-wakeup race:
13//!
14//! ```text
15//! Incorrect (racy) pattern:
16//! if !condition() { queue.wait(); } ← wakeup between check and wait is lost
17//!
18//! Correct pattern (what wait_until does internally):
19//! loop {
20//! hold waiters lock
21//! check condition → return if true
22//! push self to waiters (lock still held)
23//! release waiters lock
24//! block_current_task() ← wake_pending flag handles late wakeups
25//! // woken → re-check condition
26//! }
27//! ```
28//!
29//! ## Lost-wakeup guarantee
30//!
31//! Even the simple [`WaitQueue::wait`] is safe: the scheduler's `wake_pending`
32//! flag (set by `wake_task()` when the target is not yet in `blocked_tasks`)
33//! ensures that a `wake_one()` that races with the transition to Blocked state
34//! is never silently dropped.
35//!
36//! ## [`WaitCondition`]
37//!
38//! A higher-level wrapper that stores the condition closure alongside the
39//! queue, inspired by Theseus's `wait_condition` crate. Useful when the
40//! same condition is shared across multiple call sites.
41
42use crate::{
43 process::{block_current_task, current_task_id, wake_task, TaskId},
44 sync::SpinLock,
45};
46use alloc::collections::VecDeque;
47
48// ── WaitQueue ────────────────────────────────────────────────────────────────
49
50/// A FIFO queue of tasks waiting for an event.
51///
52/// See the [module documentation](self) for usage notes and the lost-wakeup
53/// guarantee.
54pub struct WaitQueue {
55 waiters: SpinLock<VecDeque<TaskId>>,
56}
57
58impl WaitQueue {
59 /// Create a new empty wait queue.
60 pub const fn new() -> Self {
61 WaitQueue {
62 waiters: SpinLock::new(VecDeque::new()),
63 }
64 }
65
66 /// Block the calling task until explicitly woken.
67 ///
68 /// Prefer [`wait_until`](Self::wait_until) when you have a condition to
69 /// test, to avoid spurious-wakeup loops and to benefit from the
70 /// compile-time condition guarantee.
71 ///
72 /// # Panics
73 ///
74 /// Panics if called outside of a task context (no current task).
75 pub fn wait(&self) {
76 let id = current_task_id().expect("WaitQueue::wait called with no current task");
77
78 // Add ourselves to the waiter list *before* blocking.
79 // A concurrent wake_one() that pops our id will call wake_task(id);
80 // the wake_pending flag in the scheduler handles the case where
81 // block_current_task() has not been reached yet.
82 {
83 let mut waiters = self.waiters.lock();
84 waiters.push_back(id);
85 }
86
87 // Block — returns when wake_task(id) is called (or immediately if
88 // wake_pending was already set by a racing wake_one()).
89 block_current_task();
90 }
91
92 /// Block the calling task until `condition()` returns `Some(T)`.
93 ///
94 /// The condition is checked under the waiters lock, then the task is
95 /// inserted into the queue (still under the lock) before blocking.
96 /// This makes the check-then-block sequence atomic with respect to
97 /// concurrent `wake_one()` / `wake_all()` calls.
98 ///
99 /// The task is re-woken for every notification and re-evaluates the
100 /// condition; spurious wakeups are impossible because the condition is
101 /// always re-checked before the next sleep.
102 ///
103 /// # Panics
104 ///
105 /// Panics if called outside of a task context (no current task).
106 pub fn wait_until<F, T>(&self, mut condition: F) -> T
107 where
108 F: FnMut() -> Option<T>,
109 {
110 let id = current_task_id().expect("WaitQueue::wait_until called with no current task");
111
112 loop {
113 // Hold the waiters lock while evaluating the condition so that a
114 // concurrent wake_one() either:
115 // (a) finds us in the waiter list and calls wake_task() — which
116 // sets wake_pending if we haven't blocked yet, or unblocks us
117 // if we already have, or
118 // (b) runs before we push ourselves — in which case it won't pop
119 // us, but the condition will be true on the next evaluation.
120 {
121 let mut waiters = self.waiters.lock();
122
123 if let Some(value) = condition() {
124 return value;
125 }
126
127 // Condition not yet met: register ourselves as a waiter while
128 // the lock is still held, preventing a wakeup from being missed.
129 waiters.push_back(id);
130 } // waiters lock released here
131
132 // Sleep until woken. If wake_task() already fired (racing with
133 // the lock drop above), the scheduler's wake_pending flag ensures
134 // block_current_task() returns immediately.
135 block_current_task();
136
137 // Woken — re-evaluate the condition at the top of the loop.
138 }
139 }
140
141 /// Wake the first waiting task (FIFO order).
142 ///
143 /// Returns `true` if a task was successfully woken, `false` if the queue
144 /// was empty.
145 pub fn wake_one(&self) -> bool {
146 let id = {
147 let mut waiters = self.waiters.lock();
148 waiters.pop_front()
149 };
150
151 if let Some(id) = id {
152 wake_task(id)
153 } else {
154 false
155 }
156 }
157
158 /// Wake all waiting tasks.
159 ///
160 /// Returns the number of tasks that were woken.
161 pub fn wake_all(&self) -> usize {
162 let ids: VecDeque<TaskId> = {
163 let mut waiters = self.waiters.lock();
164 core::mem::take(&mut *waiters)
165 };
166
167 let mut count = 0;
168 for id in ids {
169 if wake_task(id) {
170 count += 1;
171 }
172 }
173 count
174 }
175
176 /// Returns the number of tasks currently registered in this queue.
177 pub fn waiter_count(&self) -> usize {
178 self.waiters.lock().len()
179 }
180}
181
182// ── WaitCondition ────────────────────────────────────────────────────────────
183
184/// A named condition variable backed by a [`WaitQueue`].
185///
186/// Stores a reusable condition closure alongside the queue. Multiple tasks
187/// can wait on the same `WaitCondition`; they are all woken when
188/// [`notify_all`](Self::notify_all) is called, and each re-checks the stored
189/// condition before returning.
190///
191/// Inspired by Theseus OS `wait_condition` crate. Wrap in an `Arc` to share
192/// across tasks.
193///
194/// # Example
195///
196/// ```rust,ignore
197/// static FLAG: AtomicBool = AtomicBool::new(false);
198/// static COND: WaitCondition<_> = WaitCondition::new(|| FLAG.load(Ordering::Acquire));
199///
200/// // Waiter task:
201/// COND.wait();
202///
203/// // Notifier task:
204/// FLAG.store(true, Ordering::Release);
205/// COND.notify_all();
206/// ```
207pub struct WaitCondition<F>
208where
209 F: Fn() -> bool,
210{
211 condition: F,
212 queue: WaitQueue,
213}
214
215impl<F: Fn() -> bool> WaitCondition<F> {
216 /// Create a new `WaitCondition` with the given condition function.
217 pub fn new(condition: F) -> Self {
218 WaitCondition {
219 condition,
220 queue: WaitQueue::new(),
221 }
222 }
223
224 /// Block the current task until the stored condition returns `true`.
225 ///
226 /// Returns immediately (without blocking) if the condition is already true.
227 /// Spurious wakeups are impossible.
228 ///
229 /// # Panics
230 ///
231 /// Panics if called outside of a task context.
232 pub fn wait(&self) {
233 self.queue
234 .wait_until(|| if (self.condition)() { Some(()) } else { None })
235 }
236
237 /// Wake one task waiting on this condition.
238 ///
239 /// The woken task will re-check the condition before returning from
240 /// [`wait`](Self::wait). Returns `true` if a task was woken.
241 pub fn notify_one(&self) -> bool {
242 self.queue.wake_one()
243 }
244
245 /// Wake all tasks waiting on this condition.
246 ///
247 /// Each woken task will re-check the condition independently. Returns the
248 /// number of tasks that were moved back to the ready queue.
249 pub fn notify_all(&self) -> usize {
250 self.queue.wake_all()
251 }
252}