Skip to main content

strat9_kernel/ipc/
reply.rs

1//! IPC synchronous call/reply support.
2
3use super::message::IpcMessage;
4use crate::{
5    process::TaskId,
6    sync::{SpinLock, WaitQueue},
7};
8use alloc::{collections::BTreeMap, sync::Arc};
9
10struct ReplySlot {
11    msg: Option<IpcMessage>,
12    waitq: Arc<WaitQueue>,
13}
14
15struct ReplyRegistry {
16    slots: BTreeMap<TaskId, ReplySlot>,
17}
18
19impl ReplyRegistry {
20    /// Creates a new instance.
21    const fn new() -> Self {
22        ReplyRegistry {
23            slots: BTreeMap::new(),
24        }
25    }
26}
27
28static REPLIES: SpinLock<ReplyRegistry> = SpinLock::new(ReplyRegistry::new());
29
30/// Block the current task waiting for a reply message.
31///
32/// Returns an EPIPE error reply if the slot was removed while waiting
33/// (e.g. the server died and cleanup ran).
34pub fn wait_for_reply(task_id: TaskId) -> IpcMessage {
35    let waitq = {
36        let mut registry = REPLIES.lock();
37        let slot = registry.slots.entry(task_id).or_insert_with(|| ReplySlot {
38            msg: None,
39            waitq: Arc::new(WaitQueue::new()),
40        });
41        slot.waitq.clone()
42    };
43
44    let msg = waitq.wait_until(|| {
45        let mut registry = REPLIES.lock();
46        match registry.slots.get_mut(&task_id) {
47            Some(slot) => slot.msg.take(),
48            None => {
49                let mut err = IpcMessage::new(0x80);
50                let epipe: u32 = 32;
51                err.payload[0..4].copy_from_slice(&epipe.to_le_bytes());
52                Some(err)
53            }
54        }
55    });
56
57    let mut registry = REPLIES.lock();
58    registry.slots.remove(&task_id);
59
60    msg
61}
62
63/// Cancel all pending reply slots and wake blocked callers with EPIPE.
64///
65/// Called during task cleanup to unblock any tasks waiting for a reply
66/// that this dying task should have delivered.
67pub fn cancel_replies_waiting_on(dead_task: TaskId) {
68    let registry = REPLIES.lock();
69    let has_slot = registry.slots.contains_key(&dead_task);
70    drop(registry);
71
72    if has_slot {
73        let mut registry = REPLIES.lock();
74        if let Some(slot) = registry.slots.remove(&dead_task) {
75            slot.waitq.wake_all();
76        }
77    }
78}
79
80/// Deliver a reply message to the given task (wakes it if blocked).
81pub fn deliver_reply(target: TaskId, msg: IpcMessage) -> Result<(), ()> {
82    let waitq = {
83        let mut registry = REPLIES.lock();
84        let slot = registry.slots.entry(target).or_insert_with(|| ReplySlot {
85            msg: None,
86            waitq: Arc::new(WaitQueue::new()),
87        });
88        slot.msg = Some(msg);
89        slot.waitq.clone()
90    };
91
92    waitq.wake_one();
93    Ok(())
94}