1use alloc::sync::Arc;
3use core::sync::atomic::Ordering;
4
5use super::error::SyscallError;
6use crate::{
7 memory::userslice::{UserSliceRead, UserSliceReadWrite},
8 process::{block_current_task, current_task_id, wake_task},
9 sync::{FixedQueue, SpinLock, SpinLockGuard},
10};
11
12const FUTEX_QUEUE_BUCKETS: usize = 256;
18
19const FUTEX_BUCKET_CAPACITY: usize = 32;
24
25const FUTEX_WAITERS_CAPACITY: usize = 256;
27
28const _: () = assert!(
31 FUTEX_QUEUE_BUCKETS.is_power_of_two(),
32 "FUTEX_QUEUE_BUCKETS must be a power of two"
33);
34
35struct FutexQueue {
36 waiters: SpinLock<FixedQueue<crate::process::TaskId, FUTEX_WAITERS_CAPACITY>>,
37}
38
39impl FutexQueue {
40 const fn new() -> Self {
42 FutexQueue {
43 waiters: SpinLock::new(FixedQueue::new()),
44 }
45 }
46
47 fn pop_waiter(&self) -> Option<crate::process::TaskId> {
49 let mut waiters = self.waiters.lock();
50 waiters.pop_front()
51 }
52
53 fn remove_waiter(&self, id: crate::process::TaskId) {
55 let mut waiters = self.waiters.lock();
56 let _ = waiters.remove_first_where(|queued| *queued == id);
57 }
58
59 fn is_empty(&self) -> bool {
61 self.waiters.lock().is_empty()
62 }
63}
64
65struct FutexBucketEntry {
66 addr: u64,
67 queue: Arc<FutexQueue>,
68}
69
70struct FutexBucket {
71 entries: [Option<FutexBucketEntry>; FUTEX_BUCKET_CAPACITY],
72}
73
74impl FutexBucket {
75 const fn new() -> Self {
76 Self {
77 entries: [const { None }; FUTEX_BUCKET_CAPACITY],
78 }
79 }
80
81 fn lookup(&self, addr: u64) -> Option<Arc<FutexQueue>> {
82 self.entries.iter().find_map(|slot| match slot {
83 Some(entry) if entry.addr == addr => Some(entry.queue.clone()),
84 _ => None,
85 })
86 }
87
88 fn get_or_insert(&mut self, addr: u64) -> Result<Arc<FutexQueue>, SyscallError> {
89 let mut empty_slot = None;
90
91 for (index, slot) in self.entries.iter_mut().enumerate() {
92 match slot {
93 Some(entry) if entry.addr == addr => return Ok(entry.queue.clone()),
94 None if empty_slot.is_none() => empty_slot = Some(index),
95 _ => {}
96 }
97 }
98
99 let Some(index) = empty_slot else {
100 return Err(SyscallError::QueueFull);
101 };
102
103 let queue = Arc::new(FutexQueue::new());
104 self.entries[index] = Some(FutexBucketEntry {
105 addr,
106 queue: queue.clone(),
107 });
108 Ok(queue)
109 }
110
111 fn remove_if_empty(&mut self, addr: u64, queue: &Arc<FutexQueue>) {
128 for slot in &mut self.entries {
129 let should_remove = match slot {
130 Some(entry)
131 if entry.addr == addr
132 && Arc::ptr_eq(&entry.queue, queue)
133 && entry.queue.is_empty() =>
134 {
135 true
136 }
137 _ => false,
138 };
139
140 if should_remove {
141 *slot = None;
142 return;
143 }
144 }
145 }
146}
147
148static FUTEX_QUEUES: [SpinLock<FutexBucket>; FUTEX_QUEUE_BUCKETS] =
149 [const { SpinLock::new(FutexBucket::new()) }; FUTEX_QUEUE_BUCKETS];
150
151#[inline]
152fn futex_bucket_index(addr: u64) -> usize {
153 let h = addr.wrapping_mul(0x9e37_79b9_7f4a_7c15_u64);
159 (h >> (64 - FUTEX_QUEUE_BUCKETS.trailing_zeros())) as usize
160}
161
162#[inline]
163fn futex_bucket(addr: u64) -> &'static SpinLock<FutexBucket> {
164 &FUTEX_QUEUES[futex_bucket_index(addr)]
165}
166
167fn lookup_queue(addr: u64) -> Option<Arc<FutexQueue>> {
168 let bucket = futex_bucket(addr).lock();
169 bucket.lookup(addr)
170}
171
172fn get_queue(addr: u64) -> Result<Arc<FutexQueue>, SyscallError> {
174 let mut bucket = futex_bucket(addr).lock();
175 bucket.get_or_insert(addr)
176}
177
178fn read_u32(addr: u64) -> Result<u32, SyscallError> {
180 let slice =
181 UserSliceRead::new(addr, core::mem::size_of::<u32>()).map_err(|_| SyscallError::Fault)?;
182 slice.read_val::<u32>().map_err(|_| SyscallError::Fault)
183}
184
185#[inline]
187fn atomic_cmpxchg_u32(ptr: *mut u32, expected: u32, desired: u32) -> u32 {
188 use core::sync::atomic::{AtomicU32, Ordering};
189
190 let atomic = unsafe { &*ptr.cast::<AtomicU32>() };
194 atomic
195 .compare_exchange_weak(expected, desired, Ordering::AcqRel, Ordering::Acquire)
196 .unwrap_or_else(|x| x)
197}
198
199fn atomic_fetch_update_u32<F>(addr: u64, update: F) -> Result<u32, SyscallError>
201where
202 F: Fn(u32) -> u32,
203{
204 if (addr & 0x3) != 0 {
205 return Err(SyscallError::InvalidArgument);
206 }
207 let _slice = UserSliceReadWrite::new(addr, core::mem::size_of::<u32>())
208 .map_err(|_| SyscallError::Fault)?;
209 let ptr = addr as *mut u32;
210 let mut cur = unsafe { core::ptr::read_volatile(ptr) };
211 loop {
212 let new = update(cur);
213 let observed = atomic_cmpxchg_u32(ptr, cur, new);
214 if observed == cur {
215 return Ok(cur);
216 }
217 cur = observed;
218 }
219}
220
221fn lock_two_queues<'a>(
223 addr1: u64,
224 q1: &'a FutexQueue,
225 addr2: u64,
226 q2: &'a FutexQueue,
227) -> (
228 SpinLockGuard<'a, FixedQueue<crate::process::TaskId, FUTEX_WAITERS_CAPACITY>>,
229 SpinLockGuard<'a, FixedQueue<crate::process::TaskId, FUTEX_WAITERS_CAPACITY>>,
230) {
231 debug_assert_ne!(addr1, addr2, "lock_two_queues: same address would deadlock");
232 if addr1 < addr2 {
233 let g1 = q1.waiters.lock();
234 let g2 = q2.waiters.lock();
235 (g1, g2)
236 } else {
237 let g2 = q2.waiters.lock();
238 let g1 = q1.waiters.lock();
239 (g1, g2)
240 }
241}
242
243fn wake_from_waiters(
245 waiters: &mut FixedQueue<crate::process::TaskId, FUTEX_WAITERS_CAPACITY>,
246 max_wake: u32,
247) -> u64 {
248 let mut woke = 0u64;
249 while woke < max_wake as u64 {
250 if let Some(id) = waiters.pop_front() {
251 if wake_task(id) {
252 woke += 1;
253 }
254 } else {
255 break;
256 }
257 }
258 woke
259}
260
261fn do_requeue(
263 addr1: u64,
264 max_wake: u32,
265 max_requeue: u32,
266 addr2: u64,
267) -> Result<u64, SyscallError> {
268 if addr1 == addr2 {
269 return sys_futex_wake(addr1, max_wake);
270 }
271
272 let queue1 = lookup_queue(addr1);
273 let Some(queue1) = queue1 else {
274 return Ok(0);
275 };
276
277 let queue2 = get_queue(addr2)?;
278
279 let mut woke = 0u64;
280 let mut requeued = 0u64;
281
282 {
283 let (mut w1, mut w2) = lock_two_queues(addr1, &queue1, addr2, &queue2);
284
285 while woke < max_wake as u64 {
286 if let Some(id) = w1.pop_front() {
287 if wake_task(id) {
288 woke += 1;
289 }
290 } else {
291 break;
292 }
293 }
294
295 while requeued < max_requeue as u64 {
296 if let Some(id) = w1.pop_front() {
297 if w2.push_back(id).is_err() {
298 let _ = w1.push_back(id);
299 break;
300 }
301 requeued += 1;
302 } else {
303 break;
304 }
305 }
306 }
307
308 try_gc_queue(addr1, &queue1);
309 try_gc_queue(addr2, &queue2);
310
311 Ok(woke + requeued)
312}
313
314fn try_gc_queue(addr: u64, queue: &Arc<FutexQueue>) {
330 {
332 let waiters = queue.waiters.lock();
333 if !waiters.is_empty() {
334 return;
335 }
336 }
337 let mut bucket = futex_bucket(addr).lock();
339 bucket.remove_if_empty(addr, queue);
340}
341
342struct FutexWakeOpEncode {
343 op: u32,
344 is_oparg_shift: bool,
345 cmp: u32,
346 oparg: u32,
347 cmparg: u32,
348}
349
350impl FutexWakeOpEncode {
351 fn decode(bits: u32) -> Result<Self, SyscallError> {
353 let is_oparg_shift = ((bits >> 31) & 1) == 1;
354 let op = (bits >> 28) & 0x7;
355 let cmp = (bits >> 24) & 0xF;
356 let oparg = (bits >> 12) & 0xFFF;
357 let cmparg = bits & 0xFFF;
358
359 if op > 4 || cmp > 5 {
360 return Err(SyscallError::InvalidArgument);
361 }
362
363 Ok(Self {
364 op,
365 is_oparg_shift,
366 cmp,
367 oparg,
368 cmparg,
369 })
370 }
371
372 fn effective_oparg(&self) -> u32 {
374 if self.is_oparg_shift {
375 1u32 << (self.oparg & 31)
376 } else {
377 self.oparg
378 }
379 }
380
381 fn calculate_new_val(&self, old_val: u32) -> u32 {
383 let oparg = self.effective_oparg();
384 match self.op {
385 0 => oparg,
386 1 => old_val.wrapping_add(oparg),
387 2 => old_val | oparg,
388 3 => old_val & !oparg,
389 4 => old_val ^ oparg,
390 _ => old_val,
391 }
392 }
393
394 fn should_wake(&self, old_val: u32) -> bool {
396 match self.cmp {
397 0 => old_val == self.cmparg,
398 1 => old_val != self.cmparg,
399 2 => old_val < self.cmparg,
400 3 => old_val <= self.cmparg,
401 4 => old_val > self.cmparg,
402 5 => old_val >= self.cmparg,
403 _ => false,
404 }
405 }
406}
407
408pub fn sys_futex_wait(_addr: u64, _val: u32, _timeout_ns: u64) -> Result<u64, SyscallError> {
410 let addr = _addr;
411 let val = _val;
412 let timeout_ns = _timeout_ns;
413
414 if (addr & 0x3) != 0 {
415 return Err(SyscallError::InvalidArgument);
416 }
417
418 let id = current_task_id().ok_or(SyscallError::PermissionDenied)?;
419 let queue = get_queue(addr)?;
420
421 let saved_deadline = if timeout_ns != 0 {
422 let deadline = crate::syscall::time::current_time_ns().saturating_add(timeout_ns);
423 if let Some(task) = crate::process::get_task_by_id(id) {
424 task.wake_deadline_ns.store(deadline, Ordering::Relaxed);
425 }
426 deadline
427 } else {
428 0
429 };
430
431 {
433 let mut waiters = queue.waiters.lock();
434 let cur = read_u32(addr)?;
435 if cur != val {
436 if let Some(task) = crate::process::get_task_by_id(id) {
437 task.wake_deadline_ns.store(0, Ordering::Relaxed);
438 }
439 return Err(SyscallError::Again);
440 }
441 waiters.push_back(id).map_err(|_| SyscallError::QueueFull)?;
442 }
443
444 block_current_task();
445
446 queue.remove_waiter(id);
447 try_gc_queue(addr, &queue);
448
449 if let Some(task) = crate::process::get_task_by_id(id) {
450 task.wake_deadline_ns.store(0, Ordering::Relaxed);
451 }
452 if timeout_ns != 0 && saved_deadline != 0 {
453 let now = crate::syscall::time::current_time_ns();
454 if now >= saved_deadline {
455 return Err(SyscallError::TimedOut);
456 }
457 }
458
459 Ok(0)
460}
461
462pub fn sys_futex_wake(_addr: u64, _max_wake: u32) -> Result<u64, SyscallError> {
464 let addr = _addr;
465 let max_wake = _max_wake;
466 let queue = lookup_queue(addr);
467
468 let Some(queue) = queue else {
469 return Ok(0);
470 };
471
472 let mut woke = 0u64;
473 while woke < max_wake as u64 {
474 if let Some(id) = queue.pop_waiter() {
475 if wake_task(id) {
476 woke += 1;
477 }
478 } else {
479 break;
480 }
481 }
482
483 try_gc_queue(addr, &queue);
484
485 Ok(woke)
486}
487
488pub fn sys_futex_requeue(
490 _addr1: u64,
491 _max_wake: u32,
492 _max_requeue: u32,
493 _addr2: u64,
494) -> Result<u64, SyscallError> {
495 do_requeue(_addr1, _max_wake, _max_requeue, _addr2)
496}
497
498pub fn sys_futex_cmp_requeue(
500 _addr1: u64,
501 _max_wake: u32,
502 _max_requeue: u32,
503 _addr2: u64,
504 _expected_val: u32,
505) -> Result<u64, SyscallError> {
506 if _addr1 == _addr2 {
507 let cur = read_u32(_addr1)?;
508 if cur != _expected_val {
509 return Err(SyscallError::Again);
510 }
511 return sys_futex_wake(_addr1, _max_wake);
512 }
513
514 let queue1 = lookup_queue(_addr1);
515 let Some(queue1) = queue1 else {
516 let cur = read_u32(_addr1)?;
517 if cur != _expected_val {
518 return Err(SyscallError::Again);
519 }
520 return Ok(0);
521 };
522
523 let cur_prelim = read_u32(_addr1)?;
528 if cur_prelim != _expected_val {
529 return Err(SyscallError::Again);
530 }
531
532 let queue2 = get_queue(_addr2)?;
533 let mut woke = 0u64;
534 let mut requeued = 0u64;
535
536 {
537 let (mut w1, mut w2) = lock_two_queues(_addr1, &queue1, _addr2, &queue2);
538 let cur = read_u32(_addr1)?;
541 if cur != _expected_val {
542 return Err(SyscallError::Again);
543 }
544 while woke < _max_wake as u64 {
545 if let Some(id) = w1.pop_front() {
546 if wake_task(id) {
547 woke += 1;
548 }
549 } else {
550 break;
551 }
552 }
553 while requeued < _max_requeue as u64 {
554 if let Some(id) = w1.pop_front() {
555 if w2.push_back(id).is_err() {
556 let _ = w1.push_back(id);
557 break;
558 }
559 requeued += 1;
560 } else {
561 break;
562 }
563 }
564 }
565
566 try_gc_queue(_addr1, &queue1);
567 try_gc_queue(_addr2, &queue2);
568
569 Ok(woke + requeued)
570}
571
572pub fn sys_futex_wake_op(
574 _addr1: u64,
575 _max_wake1: u32,
576 _max_wake2: u32,
577 _addr2: u64,
578 _op: u32,
579) -> Result<u64, SyscallError> {
580 let addr1 = _addr1;
581 let addr2 = _addr2;
582 let max_wake1 = _max_wake1;
583 let max_wake2 = _max_wake2;
584 let wake_op = FutexWakeOpEncode::decode(_op)?;
585
586 let q1 = get_queue(addr1)?;
588 let q2 = get_queue(addr2)?;
589
590 let woke = if addr1 == addr2 {
591 let mut waiters = q1.waiters.lock();
592 let old = atomic_fetch_update_u32(addr2, |v| wake_op.calculate_new_val(v))?;
593 let mut woke = wake_from_waiters(&mut waiters, max_wake1);
594 if wake_op.should_wake(old) {
595 woke += wake_from_waiters(&mut waiters, max_wake2);
596 }
597 woke
598 } else {
599 let (mut w1, mut w2) = lock_two_queues(addr1, &q1, addr2, &q2);
600 let old = atomic_fetch_update_u32(addr2, |v| wake_op.calculate_new_val(v))?;
601 let mut woke = wake_from_waiters(&mut w1, max_wake1);
602 if wake_op.should_wake(old) {
603 woke += wake_from_waiters(&mut w2, max_wake2);
604 }
605 woke
606 };
607 try_gc_queue(addr1, &q1);
608 try_gc_queue(addr2, &q2);
609 Ok(woke)
610}