Skip to main content

strat9_kernel/ipc/
channel.rs

1//! Typed MPMC blocking channel for IPC between kernel tasks and silos.
2//!
3//! ## Two levels of abstraction
4//!
5//! ### 1. Typed MPMC channel — kernel-internal
6//!
7//! [`channel`]`<T>(capacity)` returns a `(`[`Sender`]`<T>, `[`Receiver`]`<T>)` pair.
8//! Both endpoints are cloneable (Multi-Producer / Multi-Consumer).
9//! When the last `Sender` is dropped, all waiting `Receiver`s see
10//! `Err(`[`ChannelError::Disconnected`]`)`, and vice-versa.
11//!
12//! ```text
13//! let (tx, rx) = channel::<u64>(8);
14//! let tx2 = tx.clone();           // second producer
15//! let rx2 = rx.clone();           // second consumer
16//! ```
17//!
18//! ### 2. Symmetric channel — userspace IPC (silo-to-silo)
19//!
20//! [`SyncChan`] is a symmetric [`IpcMessage`] channel: any holder can send
21//! *or* receive.  It is stored by [`ChanId`] in a global registry and
22//! accessed from userspace via `SYS_CHAN_*` syscalls.  Destroyed explicitly
23//! via [`SyncChan::destroy`] when all userspace handles are closed.
24//!
25//! ## Blocking guarantee
26//!
27//! Both levels use [`WaitQueue::wait_until`] — the condition closure is
28//! evaluated atomically under the waiter lock, eliminating the classic
29//! lost-wakeup race without a polling loop.
30//!
31//! ## Lock ordering
32//!
33//! To avoid deadlock:
34//! - The `queue` (buffer) lock is **always** acquired *inside* the
35//!   `wait_until` closure, and released *before* `wake_one()` is called.
36//! - `send_waitq.wake_one()` is called **outside** any recv closure.
37//! - `recv_waitq.wake_one()` is called **outside** any send closure.
38
39use super::message::IpcMessage;
40use crate::sync::{SpinLock, WaitQueue};
41use alloc::{collections::BTreeMap, sync::Arc};
42use core::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering};
43use crossbeam_queue::ArrayQueue;
44
45const STATUS_CONNECTED: u8 = 0;
46const STATUS_SENDER_GONE: u8 = 1;
47const STATUS_RECEIVER_GONE: u8 = 2;
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
50pub enum ChannelError {
51    #[error("would block")]
52    WouldBlock,
53    #[error("channel disconnected")]
54    Disconnected,
55}
56
57// ─────────────────────────────────────────────────────────────────────────────
58// Typed MPMC channel — kernel-internal
59// ─────────────────────────────────────────────────────────────────────────────
60
61/// Shared inner state for the typed MPMC channel.
62struct ChannelInner<T: Send> {
63    /// Bounded message queue.
64    buffer: ArrayQueue<T>,
65    /// Tasks blocked because the buffer is full (waiting to send).
66    send_waitq: WaitQueue,
67    /// Tasks blocked because the buffer is empty (waiting to receive).
68    recv_waitq: WaitQueue,
69    /// Channel status: CONNECTED / SENDER_GONE / RECEIVER_GONE.
70    status: AtomicU8,
71    /// Number of live [`Sender`] endpoints.
72    sender_count: AtomicUsize,
73    /// Number of live [`Receiver`] endpoints.
74    receiver_count: AtomicUsize,
75}
76
77impl<T: Send> ChannelInner<T> {
78    /// Creates a new instance.
79    fn new(capacity: usize) -> Self {
80        ChannelInner {
81            buffer: ArrayQueue::new(capacity.max(1)),
82            send_waitq: WaitQueue::new(),
83            recv_waitq: WaitQueue::new(),
84            status: AtomicU8::new(STATUS_CONNECTED),
85            sender_count: AtomicUsize::new(1),
86            receiver_count: AtomicUsize::new(1),
87        }
88    }
89
90    /// Returns whether sender gone.
91    #[inline]
92    fn is_sender_gone(&self) -> bool {
93        self.status.load(Ordering::Acquire) == STATUS_SENDER_GONE
94    }
95
96    /// Returns whether receiver gone.
97    #[inline]
98    fn is_receiver_gone(&self) -> bool {
99        self.status.load(Ordering::Acquire) == STATUS_RECEIVER_GONE
100    }
101}
102
103/// The send end of a [`channel`].
104///
105/// Cloneable (MPMC): each clone shares the same internal channel.
106/// When the last `Sender` is dropped, waiting receivers are woken with
107/// [`ChannelError::Disconnected`].
108pub struct Sender<T: Send> {
109    inner: Arc<ChannelInner<T>>,
110}
111
112impl<T: Send> Clone for Sender<T> {
113    /// Performs the clone operation.
114    fn clone(&self) -> Self {
115        self.inner.sender_count.fetch_add(1, Ordering::AcqRel);
116        Sender {
117            inner: self.inner.clone(),
118        }
119    }
120}
121
122impl<T: Send> Drop for Sender<T> {
123    /// Performs the drop operation.
124    fn drop(&mut self) {
125        if self.inner.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
126            // Last sender gone — mark and wake blocked receivers.
127            self.inner
128                .status
129                .store(STATUS_SENDER_GONE, Ordering::Release);
130            self.inner.recv_waitq.wake_all();
131        }
132    }
133}
134
135impl<T: Send> Sender<T> {
136    /// Send a message, blocking until buffer space is available.
137    ///
138    /// Returns `Ok(())` on success, or `Err(`[`ChannelError::Disconnected`]`)`
139    /// if all receivers have been dropped before or during the send.
140    pub fn send(&self, msg: T) -> Result<(), ChannelError> {
141        let mut pending = Some(msg);
142
143        let result = self.inner.send_waitq.wait_until(|| {
144            // Receiver gone: discard message and report disconnect.
145            if self.inner.is_receiver_gone() {
146                pending.take();
147                return Some(Err(ChannelError::Disconnected));
148            }
149
150            // SAFETY: `pending` is always `Some` on every invocation of this
151            // closure.  It is `take`-n here and either pushed (success) or
152            // replaced (full queue → retry next wakeup).
153            let m = pending.take().unwrap();
154            match self.inner.buffer.push(m) {
155                Ok(()) => Some(Ok(())),
156                Err(m) => {
157                    pending = Some(m);
158                    None
159                }
160            }
161            // `buf` (queue lock) is released here, before returning from the
162            // closure — never held while wake_one() is called below.
163        });
164
165        // Wake exactly one receiver AFTER releasing the waiters lock.
166        if result.is_ok() {
167            self.inner.recv_waitq.wake_one();
168        }
169        result
170    }
171
172    /// Try to send without blocking.
173    ///
174    /// Returns `Err((msg, WouldBlock))` if the buffer is full, or
175    /// `Err((msg, Disconnected))` if all receivers are gone.
176    pub fn try_send(&self, msg: T) -> Result<(), (T, ChannelError)> {
177        if self.inner.is_receiver_gone() {
178            return Err((msg, ChannelError::Disconnected));
179        }
180        match self.inner.buffer.push(msg) {
181            Ok(()) => {
182                self.inner.recv_waitq.wake_one();
183                Ok(())
184            }
185            Err(m) => {
186                let err = if self.inner.is_receiver_gone() {
187                    ChannelError::Disconnected
188                } else {
189                    ChannelError::WouldBlock
190                };
191                Err((m, err))
192            }
193        }
194    }
195
196    /// Returns `true` if all receivers have been dropped.
197    pub fn is_disconnected(&self) -> bool {
198        self.inner.is_receiver_gone()
199    }
200
201    /// Create a new [`Receiver`] endpoint connected to the same channel.
202    pub fn receiver(&self) -> Receiver<T> {
203        self.inner.receiver_count.fetch_add(1, Ordering::AcqRel);
204        let _ = self.inner.status.compare_exchange(
205            STATUS_RECEIVER_GONE,
206            STATUS_CONNECTED,
207            Ordering::AcqRel,
208            Ordering::Acquire,
209        );
210        Receiver {
211            inner: self.inner.clone(),
212        }
213    }
214}
215
216/// The receive end of a [`channel`].
217///
218/// Cloneable (MPMC): each clone shares the same internal channel.
219/// When the last `Receiver` is dropped, waiting senders are woken with
220/// [`ChannelError::Disconnected`].
221pub struct Receiver<T: Send> {
222    inner: Arc<ChannelInner<T>>,
223}
224
225impl<T: Send> Clone for Receiver<T> {
226    /// Performs the clone operation.
227    fn clone(&self) -> Self {
228        self.inner.receiver_count.fetch_add(1, Ordering::AcqRel);
229        Receiver {
230            inner: self.inner.clone(),
231        }
232    }
233}
234
235impl<T: Send> Drop for Receiver<T> {
236    /// Performs the drop operation.
237    fn drop(&mut self) {
238        if self.inner.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
239            // Last receiver gone — mark and wake blocked senders.
240            self.inner
241                .status
242                .store(STATUS_RECEIVER_GONE, Ordering::Release);
243            self.inner.send_waitq.wake_all();
244        }
245    }
246}
247
248impl<T: Send> Receiver<T> {
249    /// Receive a message, blocking until one is available.
250    ///
251    /// Returns `Ok(msg)` on success.  Returns `Err(`[`ChannelError::Disconnected`]`)`
252    /// if all senders have been dropped *and* the buffer is empty.
253    pub fn recv(&self) -> Result<T, ChannelError> {
254        let result = self.inner.recv_waitq.wait_until(|| {
255            // Try to pop under the waiters lock so we don't race with senders.
256            let msg_opt = self.inner.buffer.pop();
257            if let Some(msg) = msg_opt {
258                return Some(Ok(msg));
259            }
260            // Buffer empty: check for disconnect.
261            if self.inner.is_sender_gone() {
262                return Some(Err(ChannelError::Disconnected));
263            }
264            None // keep waiting
265        });
266
267        // Wake exactly one sender AFTER releasing the waiters lock.
268        if result.is_ok() {
269            self.inner.send_waitq.wake_one();
270        }
271        result
272    }
273
274    /// Try to receive without blocking.
275    ///
276    /// Returns `Err(WouldBlock)` if the buffer is empty, or
277    /// `Err(Disconnected)` if all senders are gone and the buffer is empty.
278    pub fn try_recv(&self) -> Result<T, ChannelError> {
279        let msg_opt = self.inner.buffer.pop();
280        if let Some(msg) = msg_opt {
281            self.inner.send_waitq.wake_one();
282            return Ok(msg);
283        }
284        if self.inner.is_sender_gone() {
285            return Err(ChannelError::Disconnected);
286        }
287        Err(ChannelError::WouldBlock)
288    }
289
290    /// Returns `true` if all senders have been dropped.
291    pub fn is_disconnected(&self) -> bool {
292        self.inner.is_sender_gone()
293    }
294
295    /// Create a new [`Sender`] endpoint connected to the same channel.
296    pub fn sender(&self) -> Sender<T> {
297        self.inner.sender_count.fetch_add(1, Ordering::AcqRel);
298        let _ = self.inner.status.compare_exchange(
299            STATUS_SENDER_GONE,
300            STATUS_CONNECTED,
301            Ordering::AcqRel,
302            Ordering::Acquire,
303        );
304        Sender {
305            inner: self.inner.clone(),
306        }
307    }
308}
309
310/// Create a new bounded MPMC channel with the given `capacity`.
311///
312/// Returns `(Sender<T>, Receiver<T>)`.  Both endpoints are cloneable to add
313/// more producers or consumers.  The capacity is rounded up to at least 1.
314///
315/// # Example (kernel-internal)
316///
317/// ```rust,ignore
318/// let (tx, rx) = channel::<u64>(8);
319/// tx.send(42).unwrap();
320/// assert_eq!(rx.recv().unwrap(), 42);
321/// ```
322pub fn channel<T: Send>(capacity: usize) -> (Sender<T>, Receiver<T>) {
323    let inner = Arc::new(ChannelInner::new(capacity));
324    (
325        Sender {
326            inner: inner.clone(),
327        },
328        Receiver { inner },
329    )
330}
331
332// Symmetric channel (SyncChan) — userspace / silo-to-silo IPC
333/// A symmetric bounded channel over [`IpcMessage`], used by the global
334/// channel registry for silo-to-silo syscall-level IPC.
335///
336/// Unlike [`Sender`]/[`Receiver`], `SyncChan` has no directional
337/// specialisation: any caller with an `Arc<SyncChan>` can both send and
338/// receive.  Destruction is explicit (via [`SyncChan::destroy`]), triggered
339/// when the last userspace handle is closed.
340pub struct SyncChan {
341    /// Bounded message queue.
342    queue: ArrayQueue<IpcMessage>,
343    /// Tasks blocked because the queue is full.
344    send_waitq: WaitQueue,
345    /// Tasks blocked because the queue is empty.
346    recv_waitq: WaitQueue,
347    /// Set to `true` by [`SyncChan::destroy`]; wakes all blocked tasks.
348    destroyed: AtomicBool,
349}
350
351impl SyncChan {
352    /// Creates a new instance.
353    fn new(capacity: usize) -> Self {
354        SyncChan {
355            queue: ArrayQueue::new(capacity.max(1)),
356            send_waitq: WaitQueue::new(),
357            recv_waitq: WaitQueue::new(),
358            destroyed: AtomicBool::new(false),
359        }
360    }
361
362    /// Send a message, blocking until space is available.
363    ///
364    /// Returns `Err(`[`ChannelError::Disconnected`]`)` if the channel has
365    /// been destroyed while the sender was blocked.
366    pub fn send(&self, msg: IpcMessage) -> Result<(), ChannelError> {
367        let mut pending = Some(msg);
368
369        let result = self.send_waitq.wait_until(|| {
370            if self.destroyed.load(Ordering::Acquire) {
371                pending.take();
372                return Some(Err(ChannelError::Disconnected));
373            }
374            // SAFETY: `pending` is always `Some` on every closure invocation.
375            let m = pending.take().unwrap();
376            match self.queue.push(m) {
377                Ok(()) => Some(Ok(())),
378                Err(m) => {
379                    pending = Some(m);
380                    None
381                }
382            }
383            // queue lock released here
384        });
385
386        if result.is_ok() {
387            self.recv_waitq.wake_one();
388        }
389        result
390    }
391
392    /// Try to send without blocking.
393    ///
394    /// Returns `Err(WouldBlock)` if the queue is full, or
395    /// `Err(Disconnected)` if the channel is destroyed.
396    pub fn try_send(&self, msg: IpcMessage) -> Result<(), ChannelError> {
397        if self.destroyed.load(Ordering::Acquire) {
398            return Err(ChannelError::Disconnected);
399        }
400        match self.queue.push(msg) {
401            Ok(()) => {
402                self.recv_waitq.wake_one();
403                Ok(())
404            }
405            Err(_) => Err(ChannelError::WouldBlock),
406        }
407    }
408
409    /// Receive a message, blocking until one arrives.
410    ///
411    /// Returns `Err(`[`ChannelError::Disconnected`]`)` if the channel was
412    /// destroyed while the receiver was blocked.
413    pub fn recv(&self) -> Result<IpcMessage, ChannelError> {
414        let result = self.recv_waitq.wait_until(|| {
415            let msg_opt = self.queue.pop();
416            if let Some(msg) = msg_opt {
417                return Some(Ok(msg));
418            }
419            if self.destroyed.load(Ordering::Acquire) {
420                return Some(Err(ChannelError::Disconnected));
421            }
422            None
423        });
424
425        if result.is_ok() {
426            self.send_waitq.wake_one();
427        }
428        result
429    }
430
431    /// Try to receive without blocking.
432    ///
433    /// Returns `Err(WouldBlock)` if the queue is empty, or
434    /// `Err(Disconnected)` if the channel is destroyed and empty.
435    pub fn try_recv(&self) -> Result<IpcMessage, ChannelError> {
436        let msg_opt = self.queue.pop();
437        if let Some(msg) = msg_opt {
438            self.send_waitq.wake_one();
439            return Ok(msg);
440        }
441        if self.destroyed.load(Ordering::Acquire) {
442            return Err(ChannelError::Disconnected);
443        }
444        Err(ChannelError::WouldBlock)
445    }
446
447    /// Mark the channel as destroyed and wake all blocked tasks.
448    ///
449    /// Called when the last userspace handle is closed.  Subsequent send/recv
450    /// operations on any still-held reference return `Disconnected`.
451    pub fn destroy(&self) {
452        self.destroyed.store(true, Ordering::Release);
453        self.send_waitq.wake_all();
454        self.recv_waitq.wake_all();
455    }
456
457    /// Returns `true` if the channel has been destroyed.
458    pub fn is_destroyed(&self) -> bool {
459        self.destroyed.load(Ordering::Acquire)
460    }
461
462    /// Returns the current number of messages buffered.
463    pub fn len(&self) -> usize {
464        self.queue.len()
465    }
466
467    /// Returns `true` if the queue is empty.
468    pub fn is_empty(&self) -> bool {
469        self.len() == 0
470    }
471
472    /// Returns whether full.
473    pub fn is_full(&self) -> bool {
474        self.queue.is_full()
475    }
476
477    /// Returns whether this can send.
478    pub fn can_send(&self) -> bool {
479        !self.destroyed.load(Ordering::Acquire) && !self.queue.is_full()
480    }
481}
482
483// Global channel registry — userspace syscall surface
484/// Unique identifier for a [`SyncChan`] in the global registry.
485#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
486pub struct ChanId(pub u64);
487
488impl ChanId {
489    /// Returns this as u64.
490    pub fn as_u64(self) -> u64 {
491        self.0
492    }
493    /// Builds this from u64.
494    pub fn from_u64(raw: u64) -> Self {
495        ChanId(raw)
496    }
497}
498
499impl core::fmt::Display for ChanId {
500    /// Performs the fmt operation.
501    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
502        write!(f, "{}", self.0)
503    }
504}
505
506/// Next channel ID to allocate.
507static NEXT_CHAN_ID: AtomicU64 = AtomicU64::new(1);
508
509/// Global registry: `ChanId → Arc<SyncChan>`.
510static CHANNELS: SpinLock<Option<BTreeMap<ChanId, Arc<SyncChan>>>> = SpinLock::new(None);
511
512/// Performs the ensure registry operation.
513fn ensure_registry(guard: &mut Option<BTreeMap<ChanId, Arc<SyncChan>>>) {
514    if guard.is_none() {
515        *guard = Some(BTreeMap::new());
516    }
517}
518
519/// Create a new [`SyncChan`] with the given capacity and register it.
520///
521/// Returns the [`ChanId`] to be returned to the creating task as a handle.
522pub fn create_channel(capacity: usize) -> ChanId {
523    let id = ChanId(NEXT_CHAN_ID.fetch_add(1, Ordering::Relaxed));
524    let chan = Arc::new(SyncChan::new(capacity));
525    let mut reg = CHANNELS.lock();
526    ensure_registry(&mut *reg);
527    reg.as_mut().unwrap().insert(id, chan);
528    log::debug!("IPC: created sync-channel {} (cap={})", id, capacity);
529    id
530}
531
532/// Look up a channel by ID. Returns a cloned `Arc<SyncChan>` if found.
533pub fn get_channel(id: ChanId) -> Option<Arc<SyncChan>> {
534    let reg = CHANNELS.lock();
535    reg.as_ref().and_then(|map| map.get(&id).cloned())
536}
537
538/// Destroy a channel: remove it from the registry and wake all waiters.
539///
540/// After this call, any thread still holding an `Arc<SyncChan>` to the
541/// same channel will see `Err(Disconnected)` on the next send/recv.
542pub fn destroy_channel(id: ChanId) -> Result<(), ChannelError> {
543    let chan = {
544        let mut reg = CHANNELS.lock();
545        let map = reg.as_mut().ok_or(ChannelError::Disconnected)?;
546        map.remove(&id).ok_or(ChannelError::Disconnected)?
547    };
548    chan.destroy();
549    log::debug!("IPC: destroyed sync-channel {}", id);
550    Ok(())
551}