strat9_kernel/vfs/
pipe.rs1use crate::{
8 sync::{waitqueue::WaitQueue, SpinLock},
9 syscall::error::SyscallError,
10};
11use alloc::sync::Arc;
12
13const PIPE_BUF_SIZE: usize = 4096;
14
15struct PipeInner {
17 buf: [u8; PIPE_BUF_SIZE],
18 read_pos: usize,
19 write_pos: usize,
20 len: usize,
22 read_closed: bool,
23 write_closed: bool,
24 read_refs: usize,
27 write_refs: usize,
29}
30
31impl PipeInner {
32 fn new() -> Self {
34 PipeInner {
35 buf: [0u8; PIPE_BUF_SIZE],
36 read_pos: 0,
37 write_pos: 0,
38 len: 0,
39 read_closed: false,
40 write_closed: false,
41 read_refs: 1,
42 write_refs: 1,
43 }
44 }
45
46 #[allow(dead_code)]
48 fn is_empty(&self) -> bool {
49 self.len == 0
50 }
51
52 #[allow(dead_code)]
54 fn is_full(&self) -> bool {
55 self.len >= PIPE_BUF_SIZE
56 }
57
58 fn available_read(&self) -> usize {
60 self.len
61 }
62
63 fn available_write(&self) -> usize {
65 PIPE_BUF_SIZE - self.len
66 }
67}
68
69pub struct Pipe {
71 inner: SpinLock<PipeInner>,
72 readers: WaitQueue,
74 writers: WaitQueue,
76}
77
78impl Pipe {
79 pub fn new() -> Arc<Self> {
81 Arc::new(Pipe {
82 inner: SpinLock::new(PipeInner::new()),
83 readers: WaitQueue::new(),
84 writers: WaitQueue::new(),
85 })
86 }
87
88 pub fn read(&self, buf: &mut [u8]) -> Result<usize, SyscallError> {
90 let result = self.readers.wait_until(|| {
91 let mut inner = self.inner.lock();
92
93 if inner.available_read() > 0 {
94 let to_read = core::cmp::min(buf.len(), inner.available_read());
95 for i in 0..to_read {
96 buf[i] = inner.buf[inner.read_pos];
97 inner.read_pos = (inner.read_pos + 1) % PIPE_BUF_SIZE;
98 }
99 inner.len -= to_read;
100 return Some(Ok(to_read));
101 }
102
103 if inner.write_closed {
104 return Some(Ok(0)); }
106
107 None });
109 self.writers.wake_one();
110 result
111 }
112
113 pub fn write(&self, buf: &[u8]) -> Result<usize, SyscallError> {
115 if buf.is_empty() {
116 return Ok(0);
117 }
118
119 let mut total = 0;
120 while total < buf.len() {
121 let wrote_some = self.writers.wait_until(|| {
122 let mut inner = self.inner.lock();
123
124 if inner.read_closed {
125 if total > 0 {
126 return Some(Ok(0)); }
128 return Some(Err(SyscallError::Pipe));
129 }
130
131 if inner.available_write() > 0 {
132 let to_write = core::cmp::min(buf.len() - total, inner.available_write());
133 for i in 0..to_write {
134 let wp = inner.write_pos;
135 inner.buf[wp] = buf[total + i];
136 inner.write_pos = (wp + 1) % PIPE_BUF_SIZE;
137 }
138 inner.len += to_write;
139 return Some(Ok(to_write));
140 }
141
142 None });
144 self.readers.wake_one();
145 let n = wrote_some?;
146 if n == 0 {
147 break; }
149 total += n;
150 }
151
152 Ok(total)
153 }
154
155 pub fn dup_read(&self) {
157 self.inner.lock().read_refs += 1;
158 }
159
160 pub fn dup_write(&self) {
162 self.inner.lock().write_refs += 1;
163 }
164
165 pub fn close_read(&self) -> bool {
168 let mut inner = self.inner.lock();
169 if inner.read_refs == 0 {
170 return false;
171 }
172 inner.read_refs -= 1;
173 if inner.read_refs == 0 {
174 inner.read_closed = true;
175 drop(inner);
176 self.writers.wake_all();
177 true
178 } else {
179 false
180 }
181 }
182
183 pub fn close_write(&self) -> bool {
186 let mut inner = self.inner.lock();
187 if inner.write_refs == 0 {
188 return false;
189 }
190 inner.write_refs -= 1;
191 if inner.write_refs == 0 {
192 inner.write_closed = true;
193 drop(inner);
194 self.readers.wake_all();
195 true
196 } else {
197 false
198 }
199 }
200}
201
202use super::scheme::{
207 finalize_pseudo_stat, DirEntry, FileStat, OpenFlags, OpenResult, Scheme, DEV_PIPEFS,
208};
209use alloc::{collections::BTreeMap, vec::Vec};
210use core::sync::atomic::{AtomicU64, Ordering};
211
212pub struct PipeScheme {
216 pipes: SpinLock<BTreeMap<u64, Arc<Pipe>>>,
217}
218
219static NEXT_PIPE_ID: AtomicU64 = AtomicU64::new(2); impl PipeScheme {
222 pub fn new() -> Self {
224 PipeScheme {
225 pipes: SpinLock::new(BTreeMap::new()),
226 }
227 }
228
229 pub fn create_pipe(&self) -> (u64, Arc<Pipe>) {
231 let base_id = NEXT_PIPE_ID.fetch_add(2, Ordering::SeqCst);
232 let pipe = Pipe::new();
233 self.pipes.lock().insert(base_id, pipe.clone());
234 (base_id, pipe)
235 }
236
237 fn get_pipe(&self, file_id: u64) -> Result<Arc<Pipe>, SyscallError> {
239 let base = file_id & !1; self.pipes
241 .lock()
242 .get(&base)
243 .cloned()
244 .ok_or(SyscallError::BadHandle)
245 }
246
247 fn is_read_end(file_id: u64) -> bool {
249 file_id & 1 == 0
250 }
251}
252
253impl Scheme for PipeScheme {
254 fn open(&self, _path: &str, _flags: OpenFlags) -> Result<OpenResult, SyscallError> {
256 Err(SyscallError::NotSupported) }
258
259 fn read(&self, file_id: u64, _offset: u64, buf: &mut [u8]) -> Result<usize, SyscallError> {
261 if !Self::is_read_end(file_id) {
262 return Err(SyscallError::PermissionDenied);
263 }
264 let pipe = self.get_pipe(file_id)?;
265 pipe.read(buf)
266 }
267
268 fn write(&self, file_id: u64, _offset: u64, buf: &[u8]) -> Result<usize, SyscallError> {
270 if Self::is_read_end(file_id) {
271 return Err(SyscallError::PermissionDenied);
272 }
273 let pipe = self.get_pipe(file_id)?;
274 pipe.write(buf)
275 }
276
277 fn close(&self, file_id: u64) -> Result<(), SyscallError> {
279 let pipe = self.get_pipe(file_id)?;
280 if Self::is_read_end(file_id) {
281 pipe.close_read();
282 } else {
283 pipe.close_write();
284 }
285
286 let base = file_id & !1;
289 let inner = pipe.inner.lock();
290 if inner.read_closed && inner.write_closed {
291 drop(inner);
292 self.pipes.lock().remove(&base);
293 }
294 Ok(())
295 }
296
297 fn stat(&self, file_id: u64) -> Result<FileStat, SyscallError> {
299 let pipe = self.get_pipe(file_id)?;
300 let inner = pipe.inner.lock();
301 Ok(finalize_pseudo_stat(
302 FileStat {
303 st_ino: file_id,
304 st_mode: 0o010600, st_nlink: 1,
306 st_size: inner.len as u64,
307 st_blksize: PIPE_BUF_SIZE as u64,
308 st_blocks: 0,
309 ..FileStat::zeroed()
310 },
311 DEV_PIPEFS,
312 0,
313 ))
314 }
315
316 fn readdir(&self, _file_id: u64) -> Result<Vec<DirEntry>, SyscallError> {
318 Err(SyscallError::InvalidArgument)
319 }
320
321 fn size(&self, file_id: u64) -> Result<u64, SyscallError> {
323 let pipe = self.get_pipe(file_id)?;
324 let len = pipe.inner.lock().len;
325 Ok(len as u64)
326 }
327}