strat9_kernel/sync/waitqueue.rs
1//! Wait queue and condition variable for blocking/waking tasks.
2//!
3//! ## Overview
4//!
5//! A [`WaitQueue`] holds a FIFO list of [`TaskId`]s waiting for an event.
6//! When a task calls [`WaitQueue::wait`], it is blocked (removed from the
7//! scheduler's ready queue) until another task calls [`WaitQueue::wake_one`]
8//! or [`WaitQueue::wake_all`].
9//!
10//! [`WaitQueue::wait_until`] is the preferred primitive: it atomically checks
11//! a caller-supplied condition before blocking, preventing the classical
12//! lost-wakeup race:
13//!
14//! ```text
15//! Incorrect (racy) pattern:
16//! if !condition() { queue.wait(); } ← wakeup between check and wait is lost
17//!
18//! Correct pattern (what wait_until does internally):
19//! loop {
20//! hold waiters lock
21//! check condition → return if true
22//! push self to waiters (lock still held)
23//! release waiters lock
24//! block_current_task() ← wake_pending flag handles late wakeups
25//! // woken → re-check condition
26//! }
27//! ```
28//!
29//! ## Lost-wakeup guarantee
30//!
31//! Even the simple [`WaitQueue::wait`] is safe: the scheduler's `wake_pending`
32//! flag (set by `wake_task()` when the target is not yet in `blocked_tasks`)
33//! ensures that a `wake_one()` that races with the transition to Blocked state
34//! is never silently dropped.
35//!
36//! ## [`WaitCondition`]
37//!
38//! A higher-level wrapper that stores the condition closure alongside the
39//! queue, inspired by Theseus's `wait_condition` crate. Useful when the
40//! same condition is shared across multiple call sites.
41
42use crate::{
43 process::{block_current_task, current_task_id, wake_task, TaskId},
44 sync::{FixedQueue, SpinLock},
45};
46
47const WAITQUEUE_CAPACITY: usize = 256;
48
49// WaitQueue ========================================
50
51/// A FIFO queue of tasks waiting for an event.
52///
53/// See the [module documentation](self) for usage notes and the lost-wakeup
54/// guarantee.
55pub struct WaitQueue {
56 waiters: SpinLock<FixedQueue<TaskId, WAITQUEUE_CAPACITY>>,
57}
58
59impl WaitQueue {
60 /// Create a new empty wait queue.
61 pub const fn new() -> Self {
62 WaitQueue {
63 waiters: SpinLock::new(FixedQueue::new()),
64 }
65 }
66
67 /// Block the calling task until explicitly woken.
68 ///
69 /// Prefer [`wait_until`](Self::wait_until) when you have a condition to
70 /// test, to avoid spurious-wakeup loops and to benefit from the
71 /// compile-time condition guarantee.
72 ///
73 /// # Panics
74 ///
75 /// Panics if called outside of a task context (no current task).
76 pub fn wait(&self) {
77 let id = current_task_id().expect("WaitQueue::wait called with no current task");
78
79 // Add ourselves to the waiter list *before* blocking.
80 // A concurrent wake_one() that pops our id will call wake_task(id);
81 // the wake_pending flag in the scheduler handles the case where
82 // block_current_task() has not been reached yet.
83 {
84 let mut waiters = self.waiters.lock();
85 waiters
86 .push_back(id)
87 .expect("WaitQueue waiter queue overflow");
88 }
89
90 // Block : returns when wake_task(id) is called (or immediately if
91 // wake_pending was already set by a racing wake_one()).
92 block_current_task();
93 }
94
95 /// Block the calling task until `condition()` returns `Some(T)`.
96 ///
97 /// The condition is checked under the waiters lock, then the task is
98 /// inserted into the queue (still under the lock) before blocking.
99 /// This makes the check-then-block sequence atomic with respect to
100 /// concurrent `wake_one()` / `wake_all()` calls.
101 ///
102 /// The task is re-woken for every notification and re-evaluates the
103 /// condition; spurious wakeups are impossible because the condition is
104 /// always re-checked before the next sleep.
105 ///
106 /// # Panics
107 ///
108 /// Panics if called outside of a task context (no current task).
109 pub fn wait_until<F, T>(&self, mut condition: F) -> T
110 where
111 F: FnMut() -> Option<T>,
112 {
113 let id = current_task_id().expect("WaitQueue::wait_until called with no current task");
114
115 loop {
116 // Hold the waiters lock while evaluating the condition so that a
117 // concurrent wake_one() either:
118 // (a) finds us in the waiter list and calls wake_task() : which
119 // sets wake_pending if we haven't blocked yet, or unblocks us
120 // if we already have, or
121 // (b) runs before we push ourselves : in which case it won't pop
122 // us, but the condition will be true on the next evaluation.
123 {
124 let mut waiters = self.waiters.lock();
125
126 if let Some(value) = condition() {
127 return value;
128 }
129
130 // Condition not yet met: register ourselves as a waiter while
131 // the lock is still held, preventing a wakeup from being missed.
132 waiters
133 .push_back(id)
134 .expect("WaitQueue waiter queue overflow");
135 } // waiters lock released here
136
137 // Sleep until woken. If wake_task() already fired (racing with
138 // the lock drop above), the scheduler's wake_pending flag ensures
139 // block_current_task() returns immediately.
140 block_current_task();
141
142 // Woken : re-evaluate the condition at the top of the loop.
143 }
144 }
145
146 /// Wake the first waiting task (FIFO order).
147 ///
148 /// Returns `true` if a task was successfully woken, `false` if the queue
149 /// was empty.
150 pub fn wake_one(&self) -> bool {
151 let id = {
152 let mut waiters = self.waiters.lock();
153 waiters.pop_front()
154 };
155
156 if let Some(id) = id {
157 wake_task(id)
158 } else {
159 false
160 }
161 }
162
163 /// Wake all waiting tasks.
164 ///
165 /// Returns the number of tasks that were woken.
166 pub fn wake_all(&self) -> usize {
167 let mut count = 0;
168 loop {
169 let id = {
170 let mut waiters = self.waiters.lock();
171 waiters.pop_front()
172 };
173
174 let Some(id) = id else {
175 break;
176 };
177
178 if wake_task(id) {
179 count += 1;
180 }
181 }
182 count
183 }
184
185 /// Returns the number of tasks currently registered in this queue.
186 pub fn waiter_count(&self) -> usize {
187 self.waiters.lock().len()
188 }
189}
190
191// WaitCondition ====================
192
193/// A named condition variable backed by a [`WaitQueue`].
194///
195/// Stores a reusable condition closure alongside the queue. Multiple tasks
196/// can wait on the same `WaitCondition`; they are all woken when
197/// [`notify_all`](Self::notify_all) is called, and each re-checks the stored
198/// condition before returning.
199///
200/// Inspired by Theseus OS `wait_condition` crate. Wrap in an `Arc` to share
201/// across tasks.
202///
203/// # Example
204///
205/// ```rust,ignore
206/// static FLAG: AtomicBool = AtomicBool::new(false);
207/// static COND: WaitCondition<_> = WaitCondition::new(|| FLAG.load(Ordering::Acquire));
208///
209/// // Waiter task:
210/// COND.wait();
211///
212/// // Notifier task:
213/// FLAG.store(true, Ordering::Release);
214/// COND.notify_all();
215/// ```
216pub struct WaitCondition<F>
217where
218 F: Fn() -> bool,
219{
220 condition: F,
221 queue: WaitQueue,
222}
223
224impl<F: Fn() -> bool> WaitCondition<F> {
225 /// Create a new `WaitCondition` with the given condition function.
226 pub fn new(condition: F) -> Self {
227 WaitCondition {
228 condition,
229 queue: WaitQueue::new(),
230 }
231 }
232
233 /// Block the current task until the stored condition returns `true`.
234 ///
235 /// Returns immediately (without blocking) if the condition is already true.
236 /// Spurious wakeups are impossible.
237 ///
238 /// # Panics
239 ///
240 /// Panics if called outside of a task context.
241 pub fn wait(&self) {
242 self.queue
243 .wait_until(|| if (self.condition)() { Some(()) } else { None })
244 }
245
246 /// Wake one task waiting on this condition.
247 ///
248 /// The woken task will re-check the condition before returning from
249 /// [`wait`](Self::wait). Returns `true` if a task was woken.
250 pub fn notify_one(&self) -> bool {
251 self.queue.wake_one()
252 }
253
254 /// Wake all tasks waiting on this condition.
255 ///
256 /// Each woken task will re-check the condition independently. Returns the
257 /// number of tasks that were moved back to the ready queue.
258 pub fn notify_all(&self) -> usize {
259 self.queue.wake_all()
260 }
261}