Skip to main content

strat9_kernel/ipc/
port.rs

1//! IPC Port — a kernel-managed message queue with blocking send/recv.
2//!
3//! Each port has a bounded FIFO queue of `IpcMessage`s. Senders block if
4//! the queue is full; receivers block if it's empty. The scheduler's
5//! block/wake API (via `WaitQueue`) provides the blocking mechanism.
6
7use super::message::IpcMessage;
8use crate::{
9    process::TaskId,
10    sync::{SpinLock, WaitQueue},
11};
12use alloc::{collections::BTreeMap, sync::Arc};
13use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use crossbeam_queue::ArrayQueue;
15
16/// Maximum number of messages buffered in a single port.
17const PORT_QUEUE_CAPACITY: usize = 16;
18
19/// Unique identifier for an IPC port.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
21pub struct PortId(pub u64);
22
23impl PortId {
24    /// Get the raw u64 value.
25    pub fn as_u64(self) -> u64 {
26        self.0
27    }
28
29    /// Create a PortId from a raw u64.
30    pub fn from_u64(raw: u64) -> Self {
31        PortId(raw)
32    }
33}
34
35impl core::fmt::Display for PortId {
36    /// Performs the fmt operation.
37    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
43pub enum IpcError {
44    #[error("port not found")]
45    PortNotFound,
46    #[error("not owner of port")]
47    NotOwner,
48    #[error("port destroyed")]
49    PortDestroyed,
50}
51
52/// An IPC port: a bounded message queue with blocking semantics.
53pub struct Port {
54    /// Unique port identifier.
55    pub id: PortId,
56    /// TaskId of the port's creator/owner.
57    pub owner: TaskId,
58    /// The message queue (bounded to `PORT_QUEUE_CAPACITY`).
59    queue: ArrayQueue<IpcMessage>,
60    /// Set to true when the port is destroyed; blocked tasks wake with error.
61    destroyed: AtomicBool,
62    /// Tasks blocked because the queue is full (waiting to send).
63    send_waitq: WaitQueue,
64    /// Tasks blocked because the queue is empty (waiting to receive).
65    recv_waitq: WaitQueue,
66}
67
68impl Port {
69    /// Create a new port owned by the given task.
70    fn new(id: PortId, owner: TaskId) -> Self {
71        Port {
72            id,
73            owner,
74            queue: ArrayQueue::new(PORT_QUEUE_CAPACITY),
75            destroyed: AtomicBool::new(false),
76            send_waitq: WaitQueue::new(),
77            recv_waitq: WaitQueue::new(),
78        }
79    }
80
81    /// Send a message to this port.
82    ///
83    /// If the queue is full, the calling task blocks until space is available.
84    /// Returns `Err(IpcError::PortDestroyed)` if the port is destroyed while
85    /// the sender is blocked.
86    pub fn send(&self, msg: IpcMessage) -> Result<(), IpcError> {
87        let result = self.send_waitq.wait_until(|| {
88            if self.destroyed.load(Ordering::Acquire) {
89                return Some(Err(IpcError::PortDestroyed));
90            }
91            match self.queue.push(msg) {
92                Ok(()) => Some(Ok(())),
93                Err(_) => None,
94            }
95        });
96        if result.is_ok() {
97            self.recv_waitq.wake_one();
98        }
99        result
100    }
101
102    /// Receive a message from this port.
103    ///
104    /// If the queue is empty, the calling task blocks until a message arrives.
105    /// Returns `Err(IpcError::PortDestroyed)` if the port is destroyed while
106    /// the receiver is blocked.
107    pub fn recv(&self) -> Result<IpcMessage, IpcError> {
108        let result = self.recv_waitq.wait_until(|| {
109            if let Some(msg) = self.queue.pop() {
110                return Some(Ok(msg));
111            }
112            if self.destroyed.load(Ordering::Acquire) {
113                return Some(Err(IpcError::PortDestroyed));
114            }
115            None
116        });
117        if result.is_ok() {
118            self.send_waitq.wake_one();
119        }
120        result
121    }
122
123    /// Try to receive a message from this port without blocking.
124    ///
125    /// Returns `Ok(Some(msg))` if a message was available, `Ok(None)` if empty,
126    /// or `Err(IpcError::PortDestroyed)` if the port is destroyed.
127    pub fn try_recv(&self) -> Result<Option<IpcMessage>, IpcError> {
128        if let Some(msg) = self.queue.pop() {
129            self.send_waitq.wake_one();
130            return Ok(Some(msg));
131        }
132        if self.destroyed.load(Ordering::Acquire) {
133            return Err(IpcError::PortDestroyed);
134        }
135        Ok(None)
136    }
137
138    /// Mark the port as destroyed and wake all blocked tasks.
139    fn destroy(&self) {
140        self.destroyed.store(true, Ordering::Release);
141        self.send_waitq.wake_all();
142        self.recv_waitq.wake_all();
143    }
144
145    /// Returns whether messages is available.
146    pub fn has_messages(&self) -> bool {
147        !self.queue.is_empty()
148    }
149
150    /// Returns whether this can send.
151    pub fn can_send(&self) -> bool {
152        !self.destroyed.load(Ordering::Acquire) && !self.queue.is_full()
153    }
154
155    /// Returns whether destroyed.
156    pub fn is_destroyed(&self) -> bool {
157        self.destroyed.load(Ordering::Acquire)
158    }
159}
160
161// ===========================================================================
162// Global port registry
163// ===========================================================================
164
165/// Next port ID to assign.
166static NEXT_PORT_ID: AtomicU64 = AtomicU64::new(1);
167
168/// Global registry of all live ports.
169static PORTS: SpinLock<Option<BTreeMap<PortId, Arc<Port>>>> = SpinLock::new(None);
170
171/// Ensure the registry is initialized (called lazily).
172fn ensure_registry(guard: &mut Option<BTreeMap<PortId, Arc<Port>>>) {
173    if guard.is_none() {
174        *guard = Some(BTreeMap::new());
175    }
176}
177
178/// Create a new port owned by `owner`. Returns the new port's ID.
179pub fn create_port(owner: TaskId) -> PortId {
180    let id = PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed));
181    let port = Arc::new(Port::new(id, owner));
182
183    let mut registry = PORTS.lock();
184    ensure_registry(&mut *registry);
185    registry.as_mut().unwrap().insert(id, port);
186
187    log::debug!("IPC: created port {} (owner={})", id, owner);
188    id
189}
190
191/// Look up a port by ID. Returns a cloned `Arc<Port>` if found.
192pub fn get_port(id: PortId) -> Option<Arc<Port>> {
193    let registry = PORTS.lock();
194    registry.as_ref().and_then(|map| map.get(&id).cloned())
195}
196
197/// Destroy a port, removing it from the registry and waking all waiters.
198///
199/// Returns `Ok(())` if destroyed, `Err` if not found or not owned by caller.
200pub fn destroy_port(id: PortId, caller: TaskId) -> Result<(), IpcError> {
201    let port = {
202        let mut registry = PORTS.lock();
203        let map = registry.as_mut().ok_or(IpcError::PortNotFound)?;
204        let port = map.get(&id).ok_or(IpcError::PortNotFound)?;
205        if port.owner != caller {
206            return Err(IpcError::NotOwner);
207        }
208        let port = port.clone();
209        map.remove(&id);
210        port
211    };
212
213    port.destroy();
214    log::debug!("IPC: destroyed port {} (by task {})", id, caller);
215    Ok(())
216}
217
218/// Clean up all ports owned by a dying task.
219///
220/// For each port: drain queued messages and deliver error replies to any
221/// callers blocked in `wait_for_reply`, then destroy the port.
222///
223/// NOTE: Only covers messages still in the queue. If the owner already
224/// recv'd a message but died before calling reply(), the caller task
225/// remains stuck. Servers should handle their own graceful shutdown.
226pub fn cleanup_ports_for_task(owner: TaskId) {
227    super::reply::cancel_replies_waiting_on(owner);
228
229    let owned: alloc::vec::Vec<Arc<Port>> = {
230        let mut registry = PORTS.lock();
231        let Some(map) = registry.as_mut() else { return };
232        let ids: alloc::vec::Vec<PortId> = map
233            .iter()
234            .filter(|(_, p)| p.owner == owner)
235            .map(|(id, _)| *id)
236            .collect();
237        let mut ports = alloc::vec::Vec::with_capacity(ids.len());
238        for id in ids {
239            if let Some(p) = map.remove(&id) {
240                ports.push(p);
241            }
242        }
243        ports
244    };
245
246    for port in owned {
247        port.destroy();
248        while let Some(msg) = port.queue.pop() {
249            let sender = TaskId::from_u64(msg.sender);
250            if sender == owner {
251                continue;
252            }
253            let mut err_reply = IpcMessage::new(0x80);
254            let epipe: u32 = 32;
255            err_reply.payload[0..4].copy_from_slice(&epipe.to_le_bytes());
256            let _ = super::reply::deliver_reply(sender, err_reply);
257        }
258    }
259}