1use alloc::{
3 collections::{BTreeMap, VecDeque},
4 sync::Arc,
5};
6use core::sync::atomic::Ordering;
7
8use super::error::SyscallError;
9use crate::{
10 memory::userslice::{UserSliceRead, UserSliceReadWrite},
11 process::{block_current_task, current_task_id, wake_task},
12 sync::{SpinLock, SpinLockGuard},
13};
14
15struct FutexQueue {
16 waiters: SpinLock<VecDeque<crate::process::TaskId>>,
17}
18
19impl FutexQueue {
20 const fn new() -> Self {
22 FutexQueue {
23 waiters: SpinLock::new(VecDeque::new()),
24 }
25 }
26
27 fn pop_waiter(&self) -> Option<crate::process::TaskId> {
29 let mut waiters = self.waiters.lock();
30 waiters.pop_front()
31 }
32
33 fn remove_waiter(&self, id: crate::process::TaskId) {
35 let mut waiters = self.waiters.lock();
36 if let Some(pos) = waiters.iter().position(|&x| x == id) {
37 waiters.remove(pos);
38 }
39 }
40
41 fn is_empty(&self) -> bool {
43 self.waiters.lock().is_empty()
44 }
45}
46
47static FUTEX_QUEUES: SpinLock<BTreeMap<u64, Arc<FutexQueue>>> = SpinLock::new(BTreeMap::new());
48
49fn get_queue(addr: u64) -> Arc<FutexQueue> {
51 let mut map = FUTEX_QUEUES.lock();
52 map.entry(addr)
53 .or_insert_with(|| Arc::new(FutexQueue::new()))
54 .clone()
55}
56
57fn read_u32(addr: u64) -> Result<u32, SyscallError> {
59 let slice =
60 UserSliceRead::new(addr, core::mem::size_of::<u32>()).map_err(|_| SyscallError::Fault)?;
61 slice.read_val::<u32>().map_err(|_| SyscallError::Fault)
62}
63
64#[inline]
66fn atomic_cmpxchg_u32(ptr: *mut u32, expected: u32, desired: u32) -> u32 {
67 use core::sync::atomic::{AtomicU32, Ordering};
68
69 let atomic = unsafe { &*ptr.cast::<AtomicU32>() };
73 atomic
74 .compare_exchange_weak(expected, desired, Ordering::AcqRel, Ordering::Acquire)
75 .unwrap_or_else(|x| x)
76}
77
78fn atomic_fetch_update_u32<F>(addr: u64, update: F) -> Result<u32, SyscallError>
80where
81 F: Fn(u32) -> u32,
82{
83 if (addr & 0x3) != 0 {
84 return Err(SyscallError::InvalidArgument);
85 }
86 let _slice = UserSliceReadWrite::new(addr, core::mem::size_of::<u32>())
87 .map_err(|_| SyscallError::Fault)?;
88 let ptr = addr as *mut u32;
89 let mut cur = unsafe { core::ptr::read_volatile(ptr) };
90 loop {
91 let new = update(cur);
92 let observed = atomic_cmpxchg_u32(ptr, cur, new);
93 if observed == cur {
94 return Ok(cur);
95 }
96 cur = observed;
97 }
98}
99
100fn lock_two_queues<'a>(
102 addr1: u64,
103 q1: &'a FutexQueue,
104 addr2: u64,
105 q2: &'a FutexQueue,
106) -> (
107 SpinLockGuard<'a, VecDeque<crate::process::TaskId>>,
108 SpinLockGuard<'a, VecDeque<crate::process::TaskId>>,
109) {
110 debug_assert_ne!(addr1, addr2, "lock_two_queues: same address would deadlock");
111 if addr1 < addr2 {
112 let g1 = q1.waiters.lock();
113 let g2 = q2.waiters.lock();
114 (g1, g2)
115 } else {
116 let g2 = q2.waiters.lock();
117 let g1 = q1.waiters.lock();
118 (g1, g2)
119 }
120}
121
122fn wake_from_waiters(waiters: &mut VecDeque<crate::process::TaskId>, max_wake: u32) -> u64 {
124 let mut woke = 0u64;
125 while woke < max_wake as u64 {
126 if let Some(id) = waiters.pop_front() {
127 if wake_task(id) {
128 woke += 1;
129 }
130 } else {
131 break;
132 }
133 }
134 woke
135}
136
137fn do_requeue(
139 addr1: u64,
140 max_wake: u32,
141 max_requeue: u32,
142 addr2: u64,
143) -> Result<u64, SyscallError> {
144 if addr1 == addr2 {
145 return sys_futex_wake(addr1, max_wake);
146 }
147
148 let queue1 = {
149 let map = FUTEX_QUEUES.lock();
150 map.get(&addr1).cloned()
151 };
152 let Some(queue1) = queue1 else {
153 return Ok(0);
154 };
155
156 let queue2 = get_queue(addr2);
157
158 let mut woke = 0u64;
159 let mut requeued = 0u64;
160
161 {
162 let (mut w1, mut w2) = lock_two_queues(addr1, &queue1, addr2, &queue2);
163
164 while woke < max_wake as u64 {
165 if let Some(id) = w1.pop_front() {
166 if wake_task(id) {
167 woke += 1;
168 }
169 } else {
170 break;
171 }
172 }
173
174 while requeued < max_requeue as u64 {
175 if let Some(id) = w1.pop_front() {
176 w2.push_back(id);
177 requeued += 1;
178 } else {
179 break;
180 }
181 }
182 }
183
184 try_gc_queue(addr1, &queue1);
185 try_gc_queue(addr2, &queue2);
186
187 Ok(woke + requeued)
188}
189
190fn try_gc_queue(addr: u64, queue: &Arc<FutexQueue>) {
192 let waiters = queue.waiters.lock();
193 if waiters.is_empty() {
194 drop(waiters);
195 let mut map = FUTEX_QUEUES.lock();
196 if let Some(q) = map.get(&addr) {
197 if q.is_empty() {
198 map.remove(&addr);
199 }
200 }
201 }
202}
203
204struct FutexWakeOpEncode {
205 op: u32,
206 is_oparg_shift: bool,
207 cmp: u32,
208 oparg: u32,
209 cmparg: u32,
210}
211
212impl FutexWakeOpEncode {
213 fn decode(bits: u32) -> Result<Self, SyscallError> {
215 let is_oparg_shift = ((bits >> 31) & 1) == 1;
216 let op = (bits >> 28) & 0x7;
217 let cmp = (bits >> 24) & 0xF;
218 let oparg = (bits >> 12) & 0xFFF;
219 let cmparg = bits & 0xFFF;
220
221 if op > 4 || cmp > 5 {
222 return Err(SyscallError::InvalidArgument);
223 }
224
225 Ok(Self {
226 op,
227 is_oparg_shift,
228 cmp,
229 oparg,
230 cmparg,
231 })
232 }
233
234 fn effective_oparg(&self) -> u32 {
236 if self.is_oparg_shift {
237 1u32 << (self.oparg & 31)
238 } else {
239 self.oparg
240 }
241 }
242
243 fn calculate_new_val(&self, old_val: u32) -> u32 {
245 let oparg = self.effective_oparg();
246 match self.op {
247 0 => oparg,
248 1 => old_val.wrapping_add(oparg),
249 2 => old_val | oparg,
250 3 => old_val & !oparg,
251 4 => old_val ^ oparg,
252 _ => old_val,
253 }
254 }
255
256 fn should_wake(&self, old_val: u32) -> bool {
258 match self.cmp {
259 0 => old_val == self.cmparg,
260 1 => old_val != self.cmparg,
261 2 => old_val < self.cmparg,
262 3 => old_val <= self.cmparg,
263 4 => old_val > self.cmparg,
264 5 => old_val >= self.cmparg,
265 _ => false,
266 }
267 }
268}
269
270pub fn sys_futex_wait(_addr: u64, _val: u32, _timeout_ns: u64) -> Result<u64, SyscallError> {
272 let addr = _addr;
273 let val = _val;
274 let timeout_ns = _timeout_ns;
275
276 if (addr & 0x3) != 0 {
277 return Err(SyscallError::InvalidArgument);
278 }
279
280 let id = current_task_id().ok_or(SyscallError::PermissionDenied)?;
281 let queue = get_queue(addr);
282
283 let saved_deadline = if timeout_ns != 0 {
284 let deadline = crate::syscall::time::current_time_ns().saturating_add(timeout_ns);
285 if let Some(task) = crate::process::get_task_by_id(id) {
286 task.wake_deadline_ns.store(deadline, Ordering::Relaxed);
287 }
288 deadline
289 } else {
290 0
291 };
292
293 {
295 let mut waiters = queue.waiters.lock();
296 let cur = read_u32(addr)?;
297 if cur != val {
298 if let Some(task) = crate::process::get_task_by_id(id) {
299 task.wake_deadline_ns.store(0, Ordering::Relaxed);
300 }
301 return Err(SyscallError::Again);
302 }
303 waiters.push_back(id);
304 }
305
306 block_current_task();
307
308 queue.remove_waiter(id);
309 try_gc_queue(addr, &queue);
310
311 if let Some(task) = crate::process::get_task_by_id(id) {
312 task.wake_deadline_ns.store(0, Ordering::Relaxed);
313 }
314 if timeout_ns != 0 && saved_deadline != 0 {
315 let now = crate::syscall::time::current_time_ns();
316 if now >= saved_deadline {
317 return Err(SyscallError::TimedOut);
318 }
319 }
320
321 Ok(0)
322}
323
324pub fn sys_futex_wake(_addr: u64, _max_wake: u32) -> Result<u64, SyscallError> {
326 let addr = _addr;
327 let max_wake = _max_wake;
328 let queue = {
329 let map = FUTEX_QUEUES.lock();
330 map.get(&addr).cloned()
331 };
332
333 let Some(queue) = queue else {
334 return Ok(0);
335 };
336
337 let mut woke = 0u64;
338 while woke < max_wake as u64 {
339 if let Some(id) = queue.pop_waiter() {
340 if wake_task(id) {
341 woke += 1;
342 }
343 } else {
344 break;
345 }
346 }
347
348 try_gc_queue(addr, &queue);
349
350 Ok(woke)
351}
352
353pub fn sys_futex_requeue(
355 _addr1: u64,
356 _max_wake: u32,
357 _max_requeue: u32,
358 _addr2: u64,
359) -> Result<u64, SyscallError> {
360 do_requeue(_addr1, _max_wake, _max_requeue, _addr2)
361}
362
363pub fn sys_futex_cmp_requeue(
365 _addr1: u64,
366 _max_wake: u32,
367 _max_requeue: u32,
368 _addr2: u64,
369 _expected_val: u32,
370) -> Result<u64, SyscallError> {
371 if _addr1 == _addr2 {
372 let cur = read_u32(_addr1)?;
373 if cur != _expected_val {
374 return Err(SyscallError::Again);
375 }
376 return sys_futex_wake(_addr1, _max_wake);
377 }
378
379 let queue1 = {
380 let map = FUTEX_QUEUES.lock();
381 map.get(&_addr1).cloned()
382 };
383 let Some(queue1) = queue1 else {
384 let cur = read_u32(_addr1)?;
385 if cur != _expected_val {
386 return Err(SyscallError::Again);
387 }
388 return Ok(0);
389 };
390
391 let queue2 = get_queue(_addr2);
392 let mut woke = 0u64;
393 let mut requeued = 0u64;
394
395 {
396 let (mut w1, mut w2) = lock_two_queues(_addr1, &queue1, _addr2, &queue2);
397 let cur = read_u32(_addr1)?;
398 if cur != _expected_val {
399 return Err(SyscallError::Again);
400 }
401 while woke < _max_wake as u64 {
402 if let Some(id) = w1.pop_front() {
403 if wake_task(id) {
404 woke += 1;
405 }
406 } else {
407 break;
408 }
409 }
410 while requeued < _max_requeue as u64 {
411 if let Some(id) = w1.pop_front() {
412 w2.push_back(id);
413 requeued += 1;
414 } else {
415 break;
416 }
417 }
418 }
419
420 try_gc_queue(_addr1, &queue1);
421 try_gc_queue(_addr2, &queue2);
422
423 Ok(woke + requeued)
424}
425
426pub fn sys_futex_wake_op(
428 _addr1: u64,
429 _max_wake1: u32,
430 _max_wake2: u32,
431 _addr2: u64,
432 _op: u32,
433) -> Result<u64, SyscallError> {
434 let addr1 = _addr1;
435 let addr2 = _addr2;
436 let max_wake1 = _max_wake1;
437 let max_wake2 = _max_wake2;
438 let wake_op = FutexWakeOpEncode::decode(_op)?;
439
440 let q1 = get_queue(addr1);
442 let q2 = get_queue(addr2);
443
444 let woke = if addr1 == addr2 {
445 let mut waiters = q1.waiters.lock();
446 let old = atomic_fetch_update_u32(addr2, |v| wake_op.calculate_new_val(v))?;
447 let mut woke = wake_from_waiters(&mut waiters, max_wake1);
448 if wake_op.should_wake(old) {
449 woke += wake_from_waiters(&mut waiters, max_wake2);
450 }
451 woke
452 } else {
453 let (mut w1, mut w2) = lock_two_queues(addr1, &q1, addr2, &q2);
454 let old = atomic_fetch_update_u32(addr2, |v| wake_op.calculate_new_val(v))?;
455 let mut woke = wake_from_waiters(&mut w1, max_wake1);
456 if wake_op.should_wake(old) {
457 woke += wake_from_waiters(&mut w2, max_wake2);
458 }
459 woke
460 };
461 try_gc_queue(addr1, &q1);
462 try_gc_queue(addr2, &q2);
463 Ok(woke)
464}