strat9_kernel/ipc/
port.rs1use super::message::IpcMessage;
8use crate::{
9 process::TaskId,
10 sync::{SpinLock, WaitQueue},
11};
12use alloc::{collections::BTreeMap, sync::Arc};
13use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use crossbeam_queue::ArrayQueue;
15
16const PORT_QUEUE_CAPACITY: usize = 16;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
21pub struct PortId(pub u64);
22
23impl PortId {
24 pub fn as_u64(self) -> u64 {
26 self.0
27 }
28
29 pub fn from_u64(raw: u64) -> Self {
31 PortId(raw)
32 }
33}
34
35impl core::fmt::Display for PortId {
36 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
38 write!(f, "{}", self.0)
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
43pub enum IpcError {
44 #[error("port not found")]
45 PortNotFound,
46 #[error("not owner of port")]
47 NotOwner,
48 #[error("port destroyed")]
49 PortDestroyed,
50}
51
52pub struct Port {
54 pub id: PortId,
56 pub owner: TaskId,
58 queue: ArrayQueue<IpcMessage>,
60 destroyed: AtomicBool,
62 send_waitq: WaitQueue,
64 recv_waitq: WaitQueue,
66}
67
68impl Port {
69 fn new(id: PortId, owner: TaskId) -> Self {
71 Port {
72 id,
73 owner,
74 queue: ArrayQueue::new(PORT_QUEUE_CAPACITY),
75 destroyed: AtomicBool::new(false),
76 send_waitq: WaitQueue::new(),
77 recv_waitq: WaitQueue::new(),
78 }
79 }
80
81 pub fn send(&self, msg: IpcMessage) -> Result<(), IpcError> {
87 let result = self.send_waitq.wait_until(|| {
88 if self.destroyed.load(Ordering::Acquire) {
89 return Some(Err(IpcError::PortDestroyed));
90 }
91 match self.queue.push(msg) {
92 Ok(()) => Some(Ok(())),
93 Err(_) => None,
94 }
95 });
96 if result.is_ok() {
97 self.recv_waitq.wake_one();
98 }
99 result
100 }
101
102 pub fn recv(&self) -> Result<IpcMessage, IpcError> {
108 let result = self.recv_waitq.wait_until(|| {
109 if let Some(msg) = self.queue.pop() {
110 return Some(Ok(msg));
111 }
112 if self.destroyed.load(Ordering::Acquire) {
113 return Some(Err(IpcError::PortDestroyed));
114 }
115 None
116 });
117 if result.is_ok() {
118 self.send_waitq.wake_one();
119 }
120 result
121 }
122
123 pub fn try_recv(&self) -> Result<Option<IpcMessage>, IpcError> {
128 if let Some(msg) = self.queue.pop() {
129 self.send_waitq.wake_one();
130 return Ok(Some(msg));
131 }
132 if self.destroyed.load(Ordering::Acquire) {
133 return Err(IpcError::PortDestroyed);
134 }
135 Ok(None)
136 }
137
138 fn destroy(&self) {
140 self.destroyed.store(true, Ordering::Release);
141 self.send_waitq.wake_all();
142 self.recv_waitq.wake_all();
143 }
144
145 pub fn has_messages(&self) -> bool {
147 !self.queue.is_empty()
148 }
149
150 pub fn can_send(&self) -> bool {
152 !self.destroyed.load(Ordering::Acquire) && !self.queue.is_full()
153 }
154
155 pub fn is_destroyed(&self) -> bool {
157 self.destroyed.load(Ordering::Acquire)
158 }
159}
160
161static NEXT_PORT_ID: AtomicU64 = AtomicU64::new(1);
167
168static PORTS: SpinLock<Option<BTreeMap<PortId, Arc<Port>>>> = SpinLock::new(None);
170
171fn ensure_registry(guard: &mut Option<BTreeMap<PortId, Arc<Port>>>) {
173 if guard.is_none() {
174 *guard = Some(BTreeMap::new());
175 }
176}
177
178pub fn create_port(owner: TaskId) -> PortId {
180 let id = PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed));
181 let port = Arc::new(Port::new(id, owner));
182
183 let mut registry = PORTS.lock();
184 ensure_registry(&mut *registry);
185 registry.as_mut().unwrap().insert(id, port);
186
187 log::debug!("IPC: created port {} (owner={})", id, owner);
188 id
189}
190
191pub fn get_port(id: PortId) -> Option<Arc<Port>> {
193 let registry = PORTS.lock();
194 registry.as_ref().and_then(|map| map.get(&id).cloned())
195}
196
197pub fn destroy_port(id: PortId, caller: TaskId) -> Result<(), IpcError> {
201 let port = {
202 let mut registry = PORTS.lock();
203 let map = registry.as_mut().ok_or(IpcError::PortNotFound)?;
204 let port = map.get(&id).ok_or(IpcError::PortNotFound)?;
205 if port.owner != caller {
206 return Err(IpcError::NotOwner);
207 }
208 let port = port.clone();
209 map.remove(&id);
210 port
211 };
212
213 port.destroy();
214 log::debug!("IPC: destroyed port {} (by task {})", id, caller);
215 Ok(())
216}
217
218pub fn cleanup_ports_for_task(owner: TaskId) {
227 super::reply::cancel_replies_waiting_on(owner);
228
229 let owned: alloc::vec::Vec<Arc<Port>> = {
230 let mut registry = PORTS.lock();
231 let Some(map) = registry.as_mut() else { return };
232 let ids: alloc::vec::Vec<PortId> = map
233 .iter()
234 .filter(|(_, p)| p.owner == owner)
235 .map(|(id, _)| *id)
236 .collect();
237 let mut ports = alloc::vec::Vec::with_capacity(ids.len());
238 for id in ids {
239 if let Some(p) = map.remove(&id) {
240 ports.push(p);
241 }
242 }
243 ports
244 };
245
246 for port in owned {
247 port.destroy();
248 while let Some(msg) = port.queue.pop() {
249 let sender = TaskId::from_u64(msg.sender);
250 if sender == owner {
251 continue;
252 }
253 let mut err_reply = IpcMessage::new(0x80);
254 let epipe: u32 = 32;
255 err_reply.payload[0..4].copy_from_slice(&epipe.to_le_bytes());
256 let _ = super::reply::deliver_reply(sender, err_reply);
257 }
258 }
259}