Skip to main content

strat9_kernel/vfs/
pipe.rs

1//! Kernel pipe implementation.
2//!
3//! A pipe is a unidirectional byte stream between two file descriptors.
4//! The read end blocks when empty; the write end returns EPIPE when
5//! the read end is closed.
6
7use crate::{
8    sync::{waitqueue::WaitQueue, SpinLock},
9    syscall::error::SyscallError,
10};
11use alloc::sync::Arc;
12
13const PIPE_BUF_SIZE: usize = 4096;
14
15/// Shared state for one pipe instance.
16struct PipeInner {
17    buf: [u8; PIPE_BUF_SIZE],
18    read_pos: usize,
19    write_pos: usize,
20    /// Number of bytes currently buffered.
21    len: usize,
22    read_closed: bool,
23    write_closed: bool,
24    /// Number of open file-descriptions referencing the read end.
25    /// The end is marked closed only when this reaches zero.
26    read_refs: usize,
27    /// Number of open file-descriptions referencing the write end.
28    write_refs: usize,
29}
30
31impl PipeInner {
32    /// Creates a new instance.
33    fn new() -> Self {
34        PipeInner {
35            buf: [0u8; PIPE_BUF_SIZE],
36            read_pos: 0,
37            write_pos: 0,
38            len: 0,
39            read_closed: false,
40            write_closed: false,
41            read_refs: 1,
42            write_refs: 1,
43        }
44    }
45
46    /// Returns whether empty.
47    #[allow(dead_code)]
48    fn is_empty(&self) -> bool {
49        self.len == 0
50    }
51
52    /// Returns whether full.
53    #[allow(dead_code)]
54    fn is_full(&self) -> bool {
55        self.len >= PIPE_BUF_SIZE
56    }
57
58    /// Performs the available read operation.
59    fn available_read(&self) -> usize {
60        self.len
61    }
62
63    /// Performs the available write operation.
64    fn available_write(&self) -> usize {
65        PIPE_BUF_SIZE - self.len
66    }
67}
68
69/// Shared pipe handle.
70pub struct Pipe {
71    inner: SpinLock<PipeInner>,
72    /// Woken when data becomes available (or write end is closed).
73    readers: WaitQueue,
74    /// Woken when space becomes available (or read end is closed).
75    writers: WaitQueue,
76}
77
78impl Pipe {
79    /// Creates a new instance.
80    pub fn new() -> Arc<Self> {
81        Arc::new(Pipe {
82            inner: SpinLock::new(PipeInner::new()),
83            readers: WaitQueue::new(),
84            writers: WaitQueue::new(),
85        })
86    }
87
88    /// Read from the pipe. Returns 0 on EOF (write end closed + empty).
89    pub fn read(&self, buf: &mut [u8]) -> Result<usize, SyscallError> {
90        let result = self.readers.wait_until(|| {
91            let mut inner = self.inner.lock();
92
93            if inner.available_read() > 0 {
94                let to_read = core::cmp::min(buf.len(), inner.available_read());
95                for i in 0..to_read {
96                    buf[i] = inner.buf[inner.read_pos];
97                    inner.read_pos = (inner.read_pos + 1) % PIPE_BUF_SIZE;
98                }
99                inner.len -= to_read;
100                return Some(Ok(to_read));
101            }
102
103            if inner.write_closed {
104                return Some(Ok(0)); // EOF
105            }
106
107            None // block
108        });
109        self.writers.wake_one();
110        result
111    }
112
113    /// Write to the pipe. Returns EPIPE if read end is closed.
114    pub fn write(&self, buf: &[u8]) -> Result<usize, SyscallError> {
115        if buf.is_empty() {
116            return Ok(0);
117        }
118
119        let mut total = 0;
120        while total < buf.len() {
121            let wrote_some = self.writers.wait_until(|| {
122                let mut inner = self.inner.lock();
123
124                if inner.read_closed {
125                    if total > 0 {
126                        return Some(Ok(0)); // return what we have
127                    }
128                    return Some(Err(SyscallError::Pipe));
129                }
130
131                if inner.available_write() > 0 {
132                    let to_write = core::cmp::min(buf.len() - total, inner.available_write());
133                    for i in 0..to_write {
134                        let wp = inner.write_pos;
135                        inner.buf[wp] = buf[total + i];
136                        inner.write_pos = (wp + 1) % PIPE_BUF_SIZE;
137                    }
138                    inner.len += to_write;
139                    return Some(Ok(to_write));
140                }
141
142                None // block
143            });
144            self.readers.wake_one();
145            let n = wrote_some?;
146            if n == 0 {
147                break; // read_closed mid-write, return partial
148            }
149            total += n;
150        }
151
152        Ok(total)
153    }
154
155    /// Increment the read-end refcount (called on dup/fork).
156    pub fn dup_read(&self) {
157        self.inner.lock().read_refs += 1;
158    }
159
160    /// Increment the write-end refcount (called on dup/fork).
161    pub fn dup_write(&self) {
162        self.inner.lock().write_refs += 1;
163    }
164
165    /// Decrement the read-end refcount; marks the end closed only when it
166    /// reaches zero.  Returns true if the end was actually closed.
167    pub fn close_read(&self) -> bool {
168        let mut inner = self.inner.lock();
169        if inner.read_refs == 0 {
170            return false;
171        }
172        inner.read_refs -= 1;
173        if inner.read_refs == 0 {
174            inner.read_closed = true;
175            drop(inner);
176            self.writers.wake_all();
177            true
178        } else {
179            false
180        }
181    }
182
183    /// Decrement the write-end refcount; marks the end closed only when it
184    /// reaches zero.  Returns true if the end was actually closed.
185    pub fn close_write(&self) -> bool {
186        let mut inner = self.inner.lock();
187        if inner.write_refs == 0 {
188            return false;
189        }
190        inner.write_refs -= 1;
191        if inner.write_refs == 0 {
192            inner.write_closed = true;
193            drop(inner);
194            self.readers.wake_all();
195            true
196        } else {
197            false
198        }
199    }
200}
201
202// ============================================================================
203// Pipe as a VFS Scheme
204// ============================================================================
205
206use super::scheme::{
207    finalize_pseudo_stat, DirEntry, FileStat, OpenFlags, OpenResult, Scheme, DEV_PIPEFS,
208};
209use alloc::{collections::BTreeMap, vec::Vec};
210use core::sync::atomic::{AtomicU64, Ordering};
211
212/// A scheme that manages kernel pipes.
213///
214/// Each pipe gets two file_ids: even = read end, odd = write end.
215pub struct PipeScheme {
216    pipes: SpinLock<BTreeMap<u64, Arc<Pipe>>>,
217}
218
219static NEXT_PIPE_ID: AtomicU64 = AtomicU64::new(2); // Start at 2 (even numbers)
220
221impl PipeScheme {
222    /// Creates a new instance.
223    pub fn new() -> Self {
224        PipeScheme {
225            pipes: SpinLock::new(BTreeMap::new()),
226        }
227    }
228
229    /// Create a new pipe pair. Returns (read_file_id, write_file_id).
230    pub fn create_pipe(&self) -> (u64, Arc<Pipe>) {
231        let base_id = NEXT_PIPE_ID.fetch_add(2, Ordering::SeqCst);
232        let pipe = Pipe::new();
233        self.pipes.lock().insert(base_id, pipe.clone());
234        (base_id, pipe)
235    }
236
237    /// Returns pipe.
238    fn get_pipe(&self, file_id: u64) -> Result<Arc<Pipe>, SyscallError> {
239        let base = file_id & !1; // Even = base
240        self.pipes
241            .lock()
242            .get(&base)
243            .cloned()
244            .ok_or(SyscallError::BadHandle)
245    }
246
247    /// Returns whether read end.
248    fn is_read_end(file_id: u64) -> bool {
249        file_id & 1 == 0
250    }
251}
252
253impl Scheme for PipeScheme {
254    /// Performs the open operation.
255    fn open(&self, _path: &str, _flags: OpenFlags) -> Result<OpenResult, SyscallError> {
256        Err(SyscallError::NotSupported) // Pipes are created via sys_pipe, not open()
257    }
258
259    /// Performs the read operation.
260    fn read(&self, file_id: u64, _offset: u64, buf: &mut [u8]) -> Result<usize, SyscallError> {
261        if !Self::is_read_end(file_id) {
262            return Err(SyscallError::PermissionDenied);
263        }
264        let pipe = self.get_pipe(file_id)?;
265        pipe.read(buf)
266    }
267
268    /// Performs the write operation.
269    fn write(&self, file_id: u64, _offset: u64, buf: &[u8]) -> Result<usize, SyscallError> {
270        if Self::is_read_end(file_id) {
271            return Err(SyscallError::PermissionDenied);
272        }
273        let pipe = self.get_pipe(file_id)?;
274        pipe.write(buf)
275    }
276
277    /// Performs the close operation.
278    fn close(&self, file_id: u64) -> Result<(), SyscallError> {
279        let pipe = self.get_pipe(file_id)?;
280        if Self::is_read_end(file_id) {
281            pipe.close_read();
282        } else {
283            pipe.close_write();
284        }
285
286        // Remove the shared Pipe entry only when both ends are fully closed
287        // (both refcounts have reached zero).
288        let base = file_id & !1;
289        let inner = pipe.inner.lock();
290        if inner.read_closed && inner.write_closed {
291            drop(inner);
292            self.pipes.lock().remove(&base);
293        }
294        Ok(())
295    }
296
297    /// Performs the stat operation.
298    fn stat(&self, file_id: u64) -> Result<FileStat, SyscallError> {
299        let pipe = self.get_pipe(file_id)?;
300        let inner = pipe.inner.lock();
301        Ok(finalize_pseudo_stat(
302            FileStat {
303                st_ino: file_id,
304                st_mode: 0o010600, // S_IFIFO | rw-------
305                st_nlink: 1,
306                st_size: inner.len as u64,
307                st_blksize: PIPE_BUF_SIZE as u64,
308                st_blocks: 0,
309                ..FileStat::zeroed()
310            },
311            DEV_PIPEFS,
312            0,
313        ))
314    }
315
316    /// Performs the readdir operation.
317    fn readdir(&self, _file_id: u64) -> Result<Vec<DirEntry>, SyscallError> {
318        Err(SyscallError::InvalidArgument)
319    }
320
321    /// Performs the size operation.
322    fn size(&self, file_id: u64) -> Result<u64, SyscallError> {
323        let pipe = self.get_pipe(file_id)?;
324        let len = pipe.inner.lock().len;
325        Ok(len as u64)
326    }
327}