Skip to main content

strat9_kernel/syscall/
futex.rs

1//! Futex (Fast Userspace Mutex) syscall handlers
2use alloc::sync::Arc;
3use core::sync::atomic::Ordering;
4
5use super::error::SyscallError;
6use crate::{
7    memory::userslice::{UserSliceRead, UserSliceReadWrite},
8    process::{block_current_task, current_task_id, wake_task},
9    sync::{FixedQueue, SpinLock, SpinLockGuard},
10};
11
12/// Number of independent hash buckets.  Must be a power of two.
13///
14/// 256 buckets × 32 slots = 8 192 distinct futex addresses system-wide.
15/// Increasing this reduces hot-bucket probability at the cost of 256 ×
16/// sizeof(SpinLock<FutexBucket>) of static memory (~16 KiB on x86-64).
17const FUTEX_QUEUE_BUCKETS: usize = 256;
18
19/// Max distinct futex addresses per bucket.
20///
21/// With the Fibonacci hash below, collisions are rare; 32 slots handle
22/// workloads with hundreds of concurrent futex addresses without spilling.
23const FUTEX_BUCKET_CAPACITY: usize = 32;
24
25/// Max concurrent waiters on a single futex address.
26const FUTEX_WAITERS_CAPACITY: usize = 256;
27
28// Fibonacci hashing requires FUTEX_QUEUE_BUCKETS to be a power of two so
29// the bit-shift index calculation is correct.
30const _: () = assert!(
31    FUTEX_QUEUE_BUCKETS.is_power_of_two(),
32    "FUTEX_QUEUE_BUCKETS must be a power of two"
33);
34
35struct FutexQueue {
36    waiters: SpinLock<FixedQueue<crate::process::TaskId, FUTEX_WAITERS_CAPACITY>>,
37}
38
39impl FutexQueue {
40    /// Creates a new instance.
41    const fn new() -> Self {
42        FutexQueue {
43            waiters: SpinLock::new(FixedQueue::new()),
44        }
45    }
46
47    /// Performs the pop waiter operation.
48    fn pop_waiter(&self) -> Option<crate::process::TaskId> {
49        let mut waiters = self.waiters.lock();
50        waiters.pop_front()
51    }
52
53    /// Performs the remove waiter operation.
54    fn remove_waiter(&self, id: crate::process::TaskId) {
55        let mut waiters = self.waiters.lock();
56        let _ = waiters.remove_first_where(|queued| *queued == id);
57    }
58
59    /// Returns whether empty.
60    fn is_empty(&self) -> bool {
61        self.waiters.lock().is_empty()
62    }
63}
64
65struct FutexBucketEntry {
66    addr: u64,
67    queue: Arc<FutexQueue>,
68}
69
70struct FutexBucket {
71    entries: [Option<FutexBucketEntry>; FUTEX_BUCKET_CAPACITY],
72}
73
74impl FutexBucket {
75    const fn new() -> Self {
76        Self {
77            entries: [const { None }; FUTEX_BUCKET_CAPACITY],
78        }
79    }
80
81    fn lookup(&self, addr: u64) -> Option<Arc<FutexQueue>> {
82        self.entries.iter().find_map(|slot| match slot {
83            Some(entry) if entry.addr == addr => Some(entry.queue.clone()),
84            _ => None,
85        })
86    }
87
88    fn get_or_insert(&mut self, addr: u64) -> Result<Arc<FutexQueue>, SyscallError> {
89        let mut empty_slot = None;
90
91        for (index, slot) in self.entries.iter_mut().enumerate() {
92            match slot {
93                Some(entry) if entry.addr == addr => return Ok(entry.queue.clone()),
94                None if empty_slot.is_none() => empty_slot = Some(index),
95                _ => {}
96            }
97        }
98
99        let Some(index) = empty_slot else {
100            return Err(SyscallError::QueueFull);
101        };
102
103        let queue = Arc::new(FutexQueue::new());
104        self.entries[index] = Some(FutexBucketEntry {
105            addr,
106            queue: queue.clone(),
107        });
108        Ok(queue)
109    }
110
111    /// Removes the slot for `addr`/`queue` if the queue is still empty.
112    ///
113    /// # Lock ordering
114    ///
115    /// This method is called while the **bucket lock** is held.  It
116    /// re-acquires `queue.waiters` internally to re-verify emptiness.
117    /// Therefore the lock order on this path is:
118    ///
119    ///   `bucket_lock` → `queue.waiters`
120    ///
121    /// **Invariant:** no code path may hold `queue.waiters` while trying to
122    /// acquire any `bucket_lock`.  Violating this would cause an ABBA
123    /// deadlock with the ordering established here.  All callers of
124    /// `queue.waiters.lock()` in the syscall handlers release the bucket
125    /// lock before acquiring waiters (bucket is released inside `get_queue`/
126    /// `lookup_queue` before the returned Arc is used).
127    fn remove_if_empty(&mut self, addr: u64, queue: &Arc<FutexQueue>) {
128        for slot in &mut self.entries {
129            let should_remove = match slot {
130                Some(entry)
131                    if entry.addr == addr
132                        && Arc::ptr_eq(&entry.queue, queue)
133                        && entry.queue.is_empty() =>
134                {
135                    true
136                }
137                _ => false,
138            };
139
140            if should_remove {
141                *slot = None;
142                return;
143            }
144        }
145    }
146}
147
148static FUTEX_QUEUES: [SpinLock<FutexBucket>; FUTEX_QUEUE_BUCKETS] =
149    [const { SpinLock::new(FutexBucket::new()) }; FUTEX_QUEUE_BUCKETS];
150
151#[inline]
152fn futex_bucket_index(addr: u64) -> usize {
153    // Fibonacci hashing: multiply by the 64-bit golden-ratio constant then
154    // take the top byte.  This spreads all address bits across the index,
155    // avoiding the hot-bucket problem of the old `(addr >> 2) & 63` which
156    // only used 6 bits and caused all page-offset-0 addresses to collide.
157    // FUTEX_QUEUE_BUCKETS must remain a power of two for the mask to work.
158    let h = addr.wrapping_mul(0x9e37_79b9_7f4a_7c15_u64);
159    (h >> (64 - FUTEX_QUEUE_BUCKETS.trailing_zeros())) as usize
160}
161
162#[inline]
163fn futex_bucket(addr: u64) -> &'static SpinLock<FutexBucket> {
164    &FUTEX_QUEUES[futex_bucket_index(addr)]
165}
166
167fn lookup_queue(addr: u64) -> Option<Arc<FutexQueue>> {
168    let bucket = futex_bucket(addr).lock();
169    bucket.lookup(addr)
170}
171
172/// Returns queue.
173fn get_queue(addr: u64) -> Result<Arc<FutexQueue>, SyscallError> {
174    let mut bucket = futex_bucket(addr).lock();
175    bucket.get_or_insert(addr)
176}
177
178/// Reads u32.
179fn read_u32(addr: u64) -> Result<u32, SyscallError> {
180    let slice =
181        UserSliceRead::new(addr, core::mem::size_of::<u32>()).map_err(|_| SyscallError::Fault)?;
182    slice.read_val::<u32>().map_err(|_| SyscallError::Fault)
183}
184
185/// Performs the atomic cmpxchg u32 operation.
186#[inline]
187fn atomic_cmpxchg_u32(ptr: *mut u32, expected: u32, desired: u32) -> u32 {
188    use core::sync::atomic::{AtomicU32, Ordering};
189
190    // SAFETY: ptr validated by UserSliceReadWrite in caller. Creating a
191    // shared &AtomicU32 from user memory is sound as long as the mapping
192    // stays live (guaranteed by the UserSlice guard held above).
193    let atomic = unsafe { &*ptr.cast::<AtomicU32>() };
194    atomic
195        .compare_exchange_weak(expected, desired, Ordering::AcqRel, Ordering::Acquire)
196        .unwrap_or_else(|x| x)
197}
198
199/// Performs the atomic fetch update u32 operation.
200fn atomic_fetch_update_u32<F>(addr: u64, update: F) -> Result<u32, SyscallError>
201where
202    F: Fn(u32) -> u32,
203{
204    if (addr & 0x3) != 0 {
205        return Err(SyscallError::InvalidArgument);
206    }
207    let _slice = UserSliceReadWrite::new(addr, core::mem::size_of::<u32>())
208        .map_err(|_| SyscallError::Fault)?;
209    let ptr = addr as *mut u32;
210    let mut cur = unsafe { core::ptr::read_volatile(ptr) };
211    loop {
212        let new = update(cur);
213        let observed = atomic_cmpxchg_u32(ptr, cur, new);
214        if observed == cur {
215            return Ok(cur);
216        }
217        cur = observed;
218    }
219}
220
221/// Performs the lock two queues operation.
222fn lock_two_queues<'a>(
223    addr1: u64,
224    q1: &'a FutexQueue,
225    addr2: u64,
226    q2: &'a FutexQueue,
227) -> (
228    SpinLockGuard<'a, FixedQueue<crate::process::TaskId, FUTEX_WAITERS_CAPACITY>>,
229    SpinLockGuard<'a, FixedQueue<crate::process::TaskId, FUTEX_WAITERS_CAPACITY>>,
230) {
231    debug_assert_ne!(addr1, addr2, "lock_two_queues: same address would deadlock");
232    if addr1 < addr2 {
233        let g1 = q1.waiters.lock();
234        let g2 = q2.waiters.lock();
235        (g1, g2)
236    } else {
237        let g2 = q2.waiters.lock();
238        let g1 = q1.waiters.lock();
239        (g1, g2)
240    }
241}
242
243/// Performs the wake from waiters operation.
244fn wake_from_waiters(
245    waiters: &mut FixedQueue<crate::process::TaskId, FUTEX_WAITERS_CAPACITY>,
246    max_wake: u32,
247) -> u64 {
248    let mut woke = 0u64;
249    while woke < max_wake as u64 {
250        if let Some(id) = waiters.pop_front() {
251            if wake_task(id) {
252                woke += 1;
253            }
254        } else {
255            break;
256        }
257    }
258    woke
259}
260
261/// Performs the do requeue operation.
262fn do_requeue(
263    addr1: u64,
264    max_wake: u32,
265    max_requeue: u32,
266    addr2: u64,
267) -> Result<u64, SyscallError> {
268    if addr1 == addr2 {
269        return sys_futex_wake(addr1, max_wake);
270    }
271
272    let queue1 = lookup_queue(addr1);
273    let Some(queue1) = queue1 else {
274        return Ok(0);
275    };
276
277    let queue2 = get_queue(addr2)?;
278
279    let mut woke = 0u64;
280    let mut requeued = 0u64;
281
282    {
283        let (mut w1, mut w2) = lock_two_queues(addr1, &queue1, addr2, &queue2);
284
285        while woke < max_wake as u64 {
286            if let Some(id) = w1.pop_front() {
287                if wake_task(id) {
288                    woke += 1;
289                }
290            } else {
291                break;
292            }
293        }
294
295        while requeued < max_requeue as u64 {
296            if let Some(id) = w1.pop_front() {
297                if w2.push_back(id).is_err() {
298                    let _ = w1.push_back(id);
299                    break;
300                }
301                requeued += 1;
302            } else {
303                break;
304            }
305        }
306    }
307
308    try_gc_queue(addr1, &queue1);
309    try_gc_queue(addr2, &queue2);
310
311    Ok(woke + requeued)
312}
313
314/// Removes the queue entry for `addr` from its bucket if the queue is empty.
315///
316/// Two-phase to avoid holding `bucket_lock` while checking emptiness (which
317/// would require acquiring `waiters` under `bucket`, violating the intended
318/// ordering of acquiring bucket before waiters):
319///
320///   Phase 1 : check under `waiters`: fast exit if non-empty.
321///   Phase 2 : remove under `bucket_lock`: `remove_if_empty` re-verifies
322///              emptiness while already holding the bucket lock.  A new
323///              waiter that enqueued between the two phases will be visible
324///              in that re-check and will block the removal.
325///
326/// TOCTOU note: a task may enqueue between phase 1 and phase 2.  This is
327/// safe: `remove_if_empty` re-checks `is_empty()` under the bucket lock,
328/// so a queue with live waiters is never removed.
329fn try_gc_queue(addr: u64, queue: &Arc<FutexQueue>) {
330    // Phase 1: early exit if still has waiters.
331    {
332        let waiters = queue.waiters.lock();
333        if !waiters.is_empty() {
334            return;
335        }
336    }
337    // Phase 2: remove under bucket lock, with re-verification inside.
338    let mut bucket = futex_bucket(addr).lock();
339    bucket.remove_if_empty(addr, queue);
340}
341
342struct FutexWakeOpEncode {
343    op: u32,
344    is_oparg_shift: bool,
345    cmp: u32,
346    oparg: u32,
347    cmparg: u32,
348}
349
350impl FutexWakeOpEncode {
351    /// Performs the decode operation.
352    fn decode(bits: u32) -> Result<Self, SyscallError> {
353        let is_oparg_shift = ((bits >> 31) & 1) == 1;
354        let op = (bits >> 28) & 0x7;
355        let cmp = (bits >> 24) & 0xF;
356        let oparg = (bits >> 12) & 0xFFF;
357        let cmparg = bits & 0xFFF;
358
359        if op > 4 || cmp > 5 {
360            return Err(SyscallError::InvalidArgument);
361        }
362
363        Ok(Self {
364            op,
365            is_oparg_shift,
366            cmp,
367            oparg,
368            cmparg,
369        })
370    }
371
372    /// Performs the effective oparg operation.
373    fn effective_oparg(&self) -> u32 {
374        if self.is_oparg_shift {
375            1u32 << (self.oparg & 31)
376        } else {
377            self.oparg
378        }
379    }
380
381    /// Performs the calculate new val operation.
382    fn calculate_new_val(&self, old_val: u32) -> u32 {
383        let oparg = self.effective_oparg();
384        match self.op {
385            0 => oparg,
386            1 => old_val.wrapping_add(oparg),
387            2 => old_val | oparg,
388            3 => old_val & !oparg,
389            4 => old_val ^ oparg,
390            _ => old_val,
391        }
392    }
393
394    /// Performs the should wake operation.
395    fn should_wake(&self, old_val: u32) -> bool {
396        match self.cmp {
397            0 => old_val == self.cmparg,
398            1 => old_val != self.cmparg,
399            2 => old_val < self.cmparg,
400            3 => old_val <= self.cmparg,
401            4 => old_val > self.cmparg,
402            5 => old_val >= self.cmparg,
403            _ => false,
404        }
405    }
406}
407
408/// SYS_FUTEX_WAIT: Wait on a futex
409pub fn sys_futex_wait(_addr: u64, _val: u32, _timeout_ns: u64) -> Result<u64, SyscallError> {
410    let addr = _addr;
411    let val = _val;
412    let timeout_ns = _timeout_ns;
413
414    if (addr & 0x3) != 0 {
415        return Err(SyscallError::InvalidArgument);
416    }
417
418    let id = current_task_id().ok_or(SyscallError::PermissionDenied)?;
419    let queue = get_queue(addr)?;
420
421    let saved_deadline = if timeout_ns != 0 {
422        let deadline = crate::syscall::time::current_time_ns().saturating_add(timeout_ns);
423        if let Some(task) = crate::process::get_task_by_id(id) {
424            task.wake_deadline_ns.store(deadline, Ordering::Relaxed);
425        }
426        deadline
427    } else {
428        0
429    };
430
431    // Hold the futex queue lock while validating the futex word and enqueuing.
432    {
433        let mut waiters = queue.waiters.lock();
434        let cur = read_u32(addr)?;
435        if cur != val {
436            if let Some(task) = crate::process::get_task_by_id(id) {
437                task.wake_deadline_ns.store(0, Ordering::Relaxed);
438            }
439            return Err(SyscallError::Again);
440        }
441        waiters.push_back(id).map_err(|_| SyscallError::QueueFull)?;
442    }
443
444    block_current_task();
445
446    queue.remove_waiter(id);
447    try_gc_queue(addr, &queue);
448
449    if let Some(task) = crate::process::get_task_by_id(id) {
450        task.wake_deadline_ns.store(0, Ordering::Relaxed);
451    }
452    if timeout_ns != 0 && saved_deadline != 0 {
453        let now = crate::syscall::time::current_time_ns();
454        if now >= saved_deadline {
455            return Err(SyscallError::TimedOut);
456        }
457    }
458
459    Ok(0)
460}
461
462/// SYS_FUTEX_WAKE: Wake waiters on a futex
463pub fn sys_futex_wake(_addr: u64, _max_wake: u32) -> Result<u64, SyscallError> {
464    let addr = _addr;
465    let max_wake = _max_wake;
466    let queue = lookup_queue(addr);
467
468    let Some(queue) = queue else {
469        return Ok(0);
470    };
471
472    let mut woke = 0u64;
473    while woke < max_wake as u64 {
474        if let Some(id) = queue.pop_waiter() {
475            if wake_task(id) {
476                woke += 1;
477            }
478        } else {
479            break;
480        }
481    }
482
483    try_gc_queue(addr, &queue);
484
485    Ok(woke)
486}
487
488/// SYS_FUTEX_REQUEUE: Requeue waiters from one futex to another
489pub fn sys_futex_requeue(
490    _addr1: u64,
491    _max_wake: u32,
492    _max_requeue: u32,
493    _addr2: u64,
494) -> Result<u64, SyscallError> {
495    do_requeue(_addr1, _max_wake, _max_requeue, _addr2)
496}
497
498/// SYS_FUTEX_CMP_REQUEUE: Conditional requeue
499pub fn sys_futex_cmp_requeue(
500    _addr1: u64,
501    _max_wake: u32,
502    _max_requeue: u32,
503    _addr2: u64,
504    _expected_val: u32,
505) -> Result<u64, SyscallError> {
506    if _addr1 == _addr2 {
507        let cur = read_u32(_addr1)?;
508        if cur != _expected_val {
509            return Err(SyscallError::Again);
510        }
511        return sys_futex_wake(_addr1, _max_wake);
512    }
513
514    let queue1 = lookup_queue(_addr1);
515    let Some(queue1) = queue1 else {
516        let cur = read_u32(_addr1)?;
517        if cur != _expected_val {
518            return Err(SyscallError::Again);
519        }
520        return Ok(0);
521    };
522
523    // Preliminary val check before allocating a bucket slot for addr2.
524    // This avoids consuming a slot on the common EAGAIN path (e.g. lock
525    // already taken by another thread).  The authoritative check is
526    // re-done under the waiter locks below to close the race window.
527    let cur_prelim = read_u32(_addr1)?;
528    if cur_prelim != _expected_val {
529        return Err(SyscallError::Again);
530    }
531
532    let queue2 = get_queue(_addr2)?;
533    let mut woke = 0u64;
534    let mut requeued = 0u64;
535
536    {
537        let (mut w1, mut w2) = lock_two_queues(_addr1, &queue1, _addr2, &queue2);
538        // Authoritative check: re-read under both waiter locks so no waker
539        // can race between the val read and the requeue.
540        let cur = read_u32(_addr1)?;
541        if cur != _expected_val {
542            return Err(SyscallError::Again);
543        }
544        while woke < _max_wake as u64 {
545            if let Some(id) = w1.pop_front() {
546                if wake_task(id) {
547                    woke += 1;
548                }
549            } else {
550                break;
551            }
552        }
553        while requeued < _max_requeue as u64 {
554            if let Some(id) = w1.pop_front() {
555                if w2.push_back(id).is_err() {
556                    let _ = w1.push_back(id);
557                    break;
558                }
559                requeued += 1;
560            } else {
561                break;
562            }
563        }
564    }
565
566    try_gc_queue(_addr1, &queue1);
567    try_gc_queue(_addr2, &queue2);
568
569    Ok(woke + requeued)
570}
571
572/// SYS_FUTEX_WAKE_OP: Wake with atomic operation
573pub fn sys_futex_wake_op(
574    _addr1: u64,
575    _max_wake1: u32,
576    _max_wake2: u32,
577    _addr2: u64,
578    _op: u32,
579) -> Result<u64, SyscallError> {
580    let addr1 = _addr1;
581    let addr2 = _addr2;
582    let max_wake1 = _max_wake1;
583    let max_wake2 = _max_wake2;
584    let wake_op = FutexWakeOpEncode::decode(_op)?;
585
586    // Materialize queues so wake and wait operations serialize on queue locks.
587    let q1 = get_queue(addr1)?;
588    let q2 = get_queue(addr2)?;
589
590    let woke = if addr1 == addr2 {
591        let mut waiters = q1.waiters.lock();
592        let old = atomic_fetch_update_u32(addr2, |v| wake_op.calculate_new_val(v))?;
593        let mut woke = wake_from_waiters(&mut waiters, max_wake1);
594        if wake_op.should_wake(old) {
595            woke += wake_from_waiters(&mut waiters, max_wake2);
596        }
597        woke
598    } else {
599        let (mut w1, mut w2) = lock_two_queues(addr1, &q1, addr2, &q2);
600        let old = atomic_fetch_update_u32(addr2, |v| wake_op.calculate_new_val(v))?;
601        let mut woke = wake_from_waiters(&mut w1, max_wake1);
602        if wake_op.should_wake(old) {
603            woke += wake_from_waiters(&mut w2, max_wake2);
604        }
605        woke
606    };
607    try_gc_queue(addr1, &q1);
608    try_gc_queue(addr2, &q2);
609    Ok(woke)
610}