Skip to main content

strat9_kernel/ipc/
reply.rs

1//! IPC synchronous call/reply support.
2
3use super::message::IpcMessage;
4use crate::{
5    process::TaskId,
6    sync::{SpinLock, WaitQueue},
7};
8use alloc::{collections::BTreeMap, sync::Arc};
9
10struct ReplySlot {
11    msg: Option<IpcMessage>,
12    waitq: Arc<WaitQueue>,
13    waiting_on: Option<TaskId>,
14}
15
16struct ReplyRegistry {
17    slots: BTreeMap<TaskId, ReplySlot>,
18}
19
20impl ReplyRegistry {
21    /// Creates a new instance.
22    const fn new() -> Self {
23        ReplyRegistry {
24            slots: BTreeMap::new(),
25        }
26    }
27}
28
29static REPLIES: SpinLock<ReplyRegistry> = SpinLock::new(ReplyRegistry::new());
30
31// TODO(migration): Reply slots are still keyed only by caller TaskId, which
32// means the kernel implicitly supports at most one outstanding synchronous
33// ipc_call per task. If nested or concurrent synchronous calls are ever needed,
34// introduce a per-call correlation token, thread it through wait_for_reply /
35// deliver_reply / cleanup_ports_for_task, and make cancellation target the
36// exact call instance instead of the whole task slot.
37
38fn epipe_reply() -> IpcMessage {
39    let mut err = IpcMessage::new(0x80);
40    let epipe: u32 = 32;
41    err.payload[0..4].copy_from_slice(&epipe.to_le_bytes());
42    err
43}
44
45/// Block the current task waiting for a reply message.
46///
47/// Returns an EPIPE error reply if the slot was removed while waiting
48/// (e.g. the server died and cleanup ran).
49pub fn wait_for_reply(task_id: TaskId, waiting_on: TaskId) -> IpcMessage {
50    let waitq = {
51        let mut registry = REPLIES.lock();
52        let slot = registry.slots.entry(task_id).or_insert_with(|| ReplySlot {
53            msg: None,
54            waitq: Arc::new(WaitQueue::new()),
55            waiting_on: Some(waiting_on),
56        });
57        slot.waiting_on = Some(waiting_on);
58        slot.waitq.clone()
59    };
60
61    let msg = waitq.wait_until(|| {
62        let mut registry = REPLIES.lock();
63        match registry.slots.get_mut(&task_id) {
64            Some(slot) => slot.msg.take(),
65            None => Some(epipe_reply()),
66        }
67    });
68
69    let mut registry = REPLIES.lock();
70    registry.slots.remove(&task_id);
71
72    msg
73}
74
75/// Cancel all pending reply slots and wake blocked callers with EPIPE.
76///
77/// Called during task cleanup to unblock any tasks waiting for a reply
78/// that this dying task should have delivered.
79pub fn cancel_replies_waiting_on(dead_task: TaskId) {
80    let waiters = {
81        let mut registry = REPLIES.lock();
82        let mut waitqs = alloc::vec::Vec::new();
83        for slot in registry.slots.values_mut() {
84            if slot.waiting_on != Some(dead_task) {
85                continue;
86            }
87            if slot.msg.is_none() {
88                slot.msg = Some(epipe_reply());
89            }
90            waitqs.push(slot.waitq.clone());
91        }
92        waitqs
93    };
94
95    for waitq in waiters {
96        waitq.wake_all();
97    }
98}
99
100/// Deliver a reply message to the given task (wakes it if blocked).
101pub fn deliver_reply(target: TaskId, msg: IpcMessage) -> Result<(), ()> {
102    let waitq = {
103        let mut registry = REPLIES.lock();
104        let slot = registry.slots.entry(target).or_insert_with(|| ReplySlot {
105            msg: None,
106            waitq: Arc::new(WaitQueue::new()),
107            waiting_on: None,
108        });
109        slot.msg = Some(msg);
110        slot.waitq.clone()
111    };
112
113    waitq.wake_one();
114    Ok(())
115}