strat9_kernel/ipc/channel.rs
1//! Typed MPMC blocking channel for IPC between kernel tasks and silos.
2//!
3//! ## Two levels of abstraction
4//!
5//! ### 1. Typed MPMC channel — kernel-internal
6//!
7//! [`channel`]`<T>(capacity)` returns a `(`[`Sender`]`<T>, `[`Receiver`]`<T>)` pair.
8//! Both endpoints are cloneable (Multi-Producer / Multi-Consumer).
9//! When the last `Sender` is dropped, all waiting `Receiver`s see
10//! `Err(`[`ChannelError::Disconnected`]`)`, and vice-versa.
11//!
12//! ```text
13//! let (tx, rx) = channel::<u64>(8);
14//! let tx2 = tx.clone(); // second producer
15//! let rx2 = rx.clone(); // second consumer
16//! ```
17//!
18//! ### 2. Symmetric channel — userspace IPC (silo-to-silo)
19//!
20//! [`SyncChan`] is a symmetric [`IpcMessage`] channel: any holder can send
21//! *or* receive. It is stored by [`ChanId`] in a global registry and
22//! accessed from userspace via `SYS_CHAN_*` syscalls. Destroyed explicitly
23//! via [`SyncChan::destroy`] when all userspace handles are closed.
24//!
25//! ## Blocking guarantee
26//!
27//! Both levels use [`WaitQueue::wait_until`] — the condition closure is
28//! evaluated atomically under the waiter lock, eliminating the classic
29//! lost-wakeup race without a polling loop.
30//!
31//! ## Lock ordering
32//!
33//! To avoid deadlock:
34//! - The `queue` (buffer) lock is **always** acquired *inside* the
35//! `wait_until` closure, and released *before* `wake_one()` is called.
36//! - `send_waitq.wake_one()` is called **outside** any recv closure.
37//! - `recv_waitq.wake_one()` is called **outside** any send closure.
38
39use super::message::IpcMessage;
40use crate::sync::{SpinLock, WaitQueue};
41use alloc::{collections::BTreeMap, sync::Arc};
42use core::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering};
43use crossbeam_queue::ArrayQueue;
44
45const STATUS_CONNECTED: u8 = 0;
46const STATUS_SENDER_GONE: u8 = 1;
47const STATUS_RECEIVER_GONE: u8 = 2;
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
50pub enum ChannelError {
51 #[error("would block")]
52 WouldBlock,
53 #[error("channel disconnected")]
54 Disconnected,
55}
56
57// ─────────────────────────────────────────────────────────────────────────────
58// Typed MPMC channel — kernel-internal
59// ─────────────────────────────────────────────────────────────────────────────
60
61/// Shared inner state for the typed MPMC channel.
62struct ChannelInner<T: Send> {
63 /// Bounded message queue.
64 buffer: ArrayQueue<T>,
65 /// Tasks blocked because the buffer is full (waiting to send).
66 send_waitq: WaitQueue,
67 /// Tasks blocked because the buffer is empty (waiting to receive).
68 recv_waitq: WaitQueue,
69 /// Channel status: CONNECTED / SENDER_GONE / RECEIVER_GONE.
70 status: AtomicU8,
71 /// Number of live [`Sender`] endpoints.
72 sender_count: AtomicUsize,
73 /// Number of live [`Receiver`] endpoints.
74 receiver_count: AtomicUsize,
75}
76
77impl<T: Send> ChannelInner<T> {
78 /// Creates a new instance.
79 fn new(capacity: usize) -> Self {
80 ChannelInner {
81 buffer: ArrayQueue::new(capacity.max(1)),
82 send_waitq: WaitQueue::new(),
83 recv_waitq: WaitQueue::new(),
84 status: AtomicU8::new(STATUS_CONNECTED),
85 sender_count: AtomicUsize::new(1),
86 receiver_count: AtomicUsize::new(1),
87 }
88 }
89
90 /// Returns whether sender gone.
91 #[inline]
92 fn is_sender_gone(&self) -> bool {
93 self.status.load(Ordering::Acquire) == STATUS_SENDER_GONE
94 }
95
96 /// Returns whether receiver gone.
97 #[inline]
98 fn is_receiver_gone(&self) -> bool {
99 self.status.load(Ordering::Acquire) == STATUS_RECEIVER_GONE
100 }
101}
102
103/// The send end of a [`channel`].
104///
105/// Cloneable (MPMC): each clone shares the same internal channel.
106/// When the last `Sender` is dropped, waiting receivers are woken with
107/// [`ChannelError::Disconnected`].
108pub struct Sender<T: Send> {
109 inner: Arc<ChannelInner<T>>,
110}
111
112impl<T: Send> Clone for Sender<T> {
113 /// Performs the clone operation.
114 fn clone(&self) -> Self {
115 self.inner.sender_count.fetch_add(1, Ordering::AcqRel);
116 Sender {
117 inner: self.inner.clone(),
118 }
119 }
120}
121
122impl<T: Send> Drop for Sender<T> {
123 /// Performs the drop operation.
124 fn drop(&mut self) {
125 if self.inner.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
126 // Last sender gone — mark and wake blocked receivers.
127 self.inner
128 .status
129 .store(STATUS_SENDER_GONE, Ordering::Release);
130 self.inner.recv_waitq.wake_all();
131 }
132 }
133}
134
135impl<T: Send> Sender<T> {
136 /// Send a message, blocking until buffer space is available.
137 ///
138 /// Returns `Ok(())` on success, or `Err(`[`ChannelError::Disconnected`]`)`
139 /// if all receivers have been dropped before or during the send.
140 pub fn send(&self, msg: T) -> Result<(), ChannelError> {
141 let mut pending = Some(msg);
142
143 let result = self.inner.send_waitq.wait_until(|| {
144 // Receiver gone: discard message and report disconnect.
145 if self.inner.is_receiver_gone() {
146 pending.take();
147 return Some(Err(ChannelError::Disconnected));
148 }
149
150 // SAFETY: `pending` is always `Some` on every invocation of this
151 // closure. It is `take`-n here and either pushed (success) or
152 // replaced (full queue → retry next wakeup).
153 let m = pending.take().unwrap();
154 match self.inner.buffer.push(m) {
155 Ok(()) => Some(Ok(())),
156 Err(m) => {
157 pending = Some(m);
158 None
159 }
160 }
161 // `buf` (queue lock) is released here, before returning from the
162 // closure — never held while wake_one() is called below.
163 });
164
165 // Wake exactly one receiver AFTER releasing the waiters lock.
166 if result.is_ok() {
167 self.inner.recv_waitq.wake_one();
168 }
169 result
170 }
171
172 /// Try to send without blocking.
173 ///
174 /// Returns `Err((msg, WouldBlock))` if the buffer is full, or
175 /// `Err((msg, Disconnected))` if all receivers are gone.
176 pub fn try_send(&self, msg: T) -> Result<(), (T, ChannelError)> {
177 if self.inner.is_receiver_gone() {
178 return Err((msg, ChannelError::Disconnected));
179 }
180 match self.inner.buffer.push(msg) {
181 Ok(()) => {
182 self.inner.recv_waitq.wake_one();
183 Ok(())
184 }
185 Err(m) => {
186 let err = if self.inner.is_receiver_gone() {
187 ChannelError::Disconnected
188 } else {
189 ChannelError::WouldBlock
190 };
191 Err((m, err))
192 }
193 }
194 }
195
196 /// Returns `true` if all receivers have been dropped.
197 pub fn is_disconnected(&self) -> bool {
198 self.inner.is_receiver_gone()
199 }
200
201 /// Create a new [`Receiver`] endpoint connected to the same channel.
202 pub fn receiver(&self) -> Receiver<T> {
203 self.inner.receiver_count.fetch_add(1, Ordering::AcqRel);
204 let _ = self.inner.status.compare_exchange(
205 STATUS_RECEIVER_GONE,
206 STATUS_CONNECTED,
207 Ordering::AcqRel,
208 Ordering::Acquire,
209 );
210 Receiver {
211 inner: self.inner.clone(),
212 }
213 }
214}
215
216/// The receive end of a [`channel`].
217///
218/// Cloneable (MPMC): each clone shares the same internal channel.
219/// When the last `Receiver` is dropped, waiting senders are woken with
220/// [`ChannelError::Disconnected`].
221pub struct Receiver<T: Send> {
222 inner: Arc<ChannelInner<T>>,
223}
224
225impl<T: Send> Clone for Receiver<T> {
226 /// Performs the clone operation.
227 fn clone(&self) -> Self {
228 self.inner.receiver_count.fetch_add(1, Ordering::AcqRel);
229 Receiver {
230 inner: self.inner.clone(),
231 }
232 }
233}
234
235impl<T: Send> Drop for Receiver<T> {
236 /// Performs the drop operation.
237 fn drop(&mut self) {
238 if self.inner.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
239 // Last receiver gone — mark and wake blocked senders.
240 self.inner
241 .status
242 .store(STATUS_RECEIVER_GONE, Ordering::Release);
243 self.inner.send_waitq.wake_all();
244 }
245 }
246}
247
248impl<T: Send> Receiver<T> {
249 /// Receive a message, blocking until one is available.
250 ///
251 /// Returns `Ok(msg)` on success. Returns `Err(`[`ChannelError::Disconnected`]`)`
252 /// if all senders have been dropped *and* the buffer is empty.
253 pub fn recv(&self) -> Result<T, ChannelError> {
254 let result = self.inner.recv_waitq.wait_until(|| {
255 // Try to pop under the waiters lock so we don't race with senders.
256 let msg_opt = self.inner.buffer.pop();
257 if let Some(msg) = msg_opt {
258 return Some(Ok(msg));
259 }
260 // Buffer empty: check for disconnect.
261 if self.inner.is_sender_gone() {
262 return Some(Err(ChannelError::Disconnected));
263 }
264 None // keep waiting
265 });
266
267 // Wake exactly one sender AFTER releasing the waiters lock.
268 if result.is_ok() {
269 self.inner.send_waitq.wake_one();
270 }
271 result
272 }
273
274 /// Try to receive without blocking.
275 ///
276 /// Returns `Err(WouldBlock)` if the buffer is empty, or
277 /// `Err(Disconnected)` if all senders are gone and the buffer is empty.
278 pub fn try_recv(&self) -> Result<T, ChannelError> {
279 let msg_opt = self.inner.buffer.pop();
280 if let Some(msg) = msg_opt {
281 self.inner.send_waitq.wake_one();
282 return Ok(msg);
283 }
284 if self.inner.is_sender_gone() {
285 return Err(ChannelError::Disconnected);
286 }
287 Err(ChannelError::WouldBlock)
288 }
289
290 /// Returns `true` if all senders have been dropped.
291 pub fn is_disconnected(&self) -> bool {
292 self.inner.is_sender_gone()
293 }
294
295 /// Create a new [`Sender`] endpoint connected to the same channel.
296 pub fn sender(&self) -> Sender<T> {
297 self.inner.sender_count.fetch_add(1, Ordering::AcqRel);
298 let _ = self.inner.status.compare_exchange(
299 STATUS_SENDER_GONE,
300 STATUS_CONNECTED,
301 Ordering::AcqRel,
302 Ordering::Acquire,
303 );
304 Sender {
305 inner: self.inner.clone(),
306 }
307 }
308}
309
310/// Create a new bounded MPMC channel with the given `capacity`.
311///
312/// Returns `(Sender<T>, Receiver<T>)`. Both endpoints are cloneable to add
313/// more producers or consumers. The capacity is rounded up to at least 1.
314///
315/// # Example (kernel-internal)
316///
317/// ```rust,ignore
318/// let (tx, rx) = channel::<u64>(8);
319/// tx.send(42).unwrap();
320/// assert_eq!(rx.recv().unwrap(), 42);
321/// ```
322pub fn channel<T: Send>(capacity: usize) -> (Sender<T>, Receiver<T>) {
323 let inner = Arc::new(ChannelInner::new(capacity));
324 (
325 Sender {
326 inner: inner.clone(),
327 },
328 Receiver { inner },
329 )
330}
331
332// Symmetric channel (SyncChan) — userspace / silo-to-silo IPC
333/// A symmetric bounded channel over [`IpcMessage`], used by the global
334/// channel registry for silo-to-silo syscall-level IPC.
335///
336/// Unlike [`Sender`]/[`Receiver`], `SyncChan` has no directional
337/// specialisation: any caller with an `Arc<SyncChan>` can both send and
338/// receive. Destruction is explicit (via [`SyncChan::destroy`]), triggered
339/// when the last userspace handle is closed.
340pub struct SyncChan {
341 /// Bounded message queue.
342 queue: ArrayQueue<IpcMessage>,
343 /// Tasks blocked because the queue is full.
344 send_waitq: WaitQueue,
345 /// Tasks blocked because the queue is empty.
346 recv_waitq: WaitQueue,
347 /// Set to `true` by [`SyncChan::destroy`]; wakes all blocked tasks.
348 destroyed: AtomicBool,
349}
350
351impl SyncChan {
352 /// Creates a new instance.
353 fn new(capacity: usize) -> Self {
354 SyncChan {
355 queue: ArrayQueue::new(capacity.max(1)),
356 send_waitq: WaitQueue::new(),
357 recv_waitq: WaitQueue::new(),
358 destroyed: AtomicBool::new(false),
359 }
360 }
361
362 /// Send a message, blocking until space is available.
363 ///
364 /// Returns `Err(`[`ChannelError::Disconnected`]`)` if the channel has
365 /// been destroyed while the sender was blocked.
366 pub fn send(&self, msg: IpcMessage) -> Result<(), ChannelError> {
367 let mut pending = Some(msg);
368
369 let result = self.send_waitq.wait_until(|| {
370 if self.destroyed.load(Ordering::Acquire) {
371 pending.take();
372 return Some(Err(ChannelError::Disconnected));
373 }
374 // SAFETY: `pending` is always `Some` on every closure invocation.
375 let m = pending.take().unwrap();
376 match self.queue.push(m) {
377 Ok(()) => Some(Ok(())),
378 Err(m) => {
379 pending = Some(m);
380 None
381 }
382 }
383 // queue lock released here
384 });
385
386 if result.is_ok() {
387 self.recv_waitq.wake_one();
388 }
389 result
390 }
391
392 /// Try to send without blocking.
393 ///
394 /// Returns `Err(WouldBlock)` if the queue is full, or
395 /// `Err(Disconnected)` if the channel is destroyed.
396 pub fn try_send(&self, msg: IpcMessage) -> Result<(), ChannelError> {
397 if self.destroyed.load(Ordering::Acquire) {
398 return Err(ChannelError::Disconnected);
399 }
400 match self.queue.push(msg) {
401 Ok(()) => {
402 self.recv_waitq.wake_one();
403 Ok(())
404 }
405 Err(_) => Err(ChannelError::WouldBlock),
406 }
407 }
408
409 /// Receive a message, blocking until one arrives.
410 ///
411 /// Returns `Err(`[`ChannelError::Disconnected`]`)` if the channel was
412 /// destroyed while the receiver was blocked.
413 pub fn recv(&self) -> Result<IpcMessage, ChannelError> {
414 let result = self.recv_waitq.wait_until(|| {
415 let msg_opt = self.queue.pop();
416 if let Some(msg) = msg_opt {
417 return Some(Ok(msg));
418 }
419 if self.destroyed.load(Ordering::Acquire) {
420 return Some(Err(ChannelError::Disconnected));
421 }
422 None
423 });
424
425 if result.is_ok() {
426 self.send_waitq.wake_one();
427 }
428 result
429 }
430
431 /// Try to receive without blocking.
432 ///
433 /// Returns `Err(WouldBlock)` if the queue is empty, or
434 /// `Err(Disconnected)` if the channel is destroyed and empty.
435 pub fn try_recv(&self) -> Result<IpcMessage, ChannelError> {
436 let msg_opt = self.queue.pop();
437 if let Some(msg) = msg_opt {
438 self.send_waitq.wake_one();
439 return Ok(msg);
440 }
441 if self.destroyed.load(Ordering::Acquire) {
442 return Err(ChannelError::Disconnected);
443 }
444 Err(ChannelError::WouldBlock)
445 }
446
447 /// Mark the channel as destroyed and wake all blocked tasks.
448 ///
449 /// Called when the last userspace handle is closed. Subsequent send/recv
450 /// operations on any still-held reference return `Disconnected`.
451 pub fn destroy(&self) {
452 self.destroyed.store(true, Ordering::Release);
453 self.send_waitq.wake_all();
454 self.recv_waitq.wake_all();
455 }
456
457 /// Returns `true` if the channel has been destroyed.
458 pub fn is_destroyed(&self) -> bool {
459 self.destroyed.load(Ordering::Acquire)
460 }
461
462 /// Returns the current number of messages buffered.
463 pub fn len(&self) -> usize {
464 self.queue.len()
465 }
466
467 /// Returns `true` if the queue is empty.
468 pub fn is_empty(&self) -> bool {
469 self.len() == 0
470 }
471
472 /// Returns whether full.
473 pub fn is_full(&self) -> bool {
474 self.queue.is_full()
475 }
476
477 /// Returns whether this can send.
478 pub fn can_send(&self) -> bool {
479 !self.destroyed.load(Ordering::Acquire) && !self.queue.is_full()
480 }
481}
482
483// Global channel registry — userspace syscall surface
484/// Unique identifier for a [`SyncChan`] in the global registry.
485#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
486pub struct ChanId(pub u64);
487
488impl ChanId {
489 /// Returns this as u64.
490 pub fn as_u64(self) -> u64 {
491 self.0
492 }
493 /// Builds this from u64.
494 pub fn from_u64(raw: u64) -> Self {
495 ChanId(raw)
496 }
497}
498
499impl core::fmt::Display for ChanId {
500 /// Performs the fmt operation.
501 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
502 write!(f, "{}", self.0)
503 }
504}
505
506/// Next channel ID to allocate.
507static NEXT_CHAN_ID: AtomicU64 = AtomicU64::new(1);
508
509/// Global registry: `ChanId → Arc<SyncChan>`.
510static CHANNELS: SpinLock<Option<BTreeMap<ChanId, Arc<SyncChan>>>> = SpinLock::new(None);
511
512/// Performs the ensure registry operation.
513fn ensure_registry(guard: &mut Option<BTreeMap<ChanId, Arc<SyncChan>>>) {
514 if guard.is_none() {
515 *guard = Some(BTreeMap::new());
516 }
517}
518
519/// Create a new [`SyncChan`] with the given capacity and register it.
520///
521/// Returns the [`ChanId`] to be returned to the creating task as a handle.
522pub fn create_channel(capacity: usize) -> ChanId {
523 let id = ChanId(NEXT_CHAN_ID.fetch_add(1, Ordering::Relaxed));
524 let chan = Arc::new(SyncChan::new(capacity));
525 let mut reg = CHANNELS.lock();
526 ensure_registry(&mut *reg);
527 reg.as_mut().unwrap().insert(id, chan);
528 log::debug!("IPC: created sync-channel {} (cap={})", id, capacity);
529 id
530}
531
532/// Look up a channel by ID. Returns a cloned `Arc<SyncChan>` if found.
533pub fn get_channel(id: ChanId) -> Option<Arc<SyncChan>> {
534 let reg = CHANNELS.lock();
535 reg.as_ref().and_then(|map| map.get(&id).cloned())
536}
537
538/// Destroy a channel: remove it from the registry and wake all waiters.
539///
540/// After this call, any thread still holding an `Arc<SyncChan>` to the
541/// same channel will see `Err(Disconnected)` on the next send/recv.
542pub fn destroy_channel(id: ChanId) -> Result<(), ChannelError> {
543 let chan = {
544 let mut reg = CHANNELS.lock();
545 let map = reg.as_mut().ok_or(ChannelError::Disconnected)?;
546 map.remove(&id).ok_or(ChannelError::Disconnected)?
547 };
548 chan.destroy();
549 log::debug!("IPC: destroyed sync-channel {}", id);
550 Ok(())
551}