Skip to main content

strat9_kernel/syscall/
futex.rs

1//! Futex (Fast Userspace Mutex) syscall handlers
2use alloc::{
3    collections::{BTreeMap, VecDeque},
4    sync::Arc,
5};
6use core::sync::atomic::Ordering;
7
8use super::error::SyscallError;
9use crate::{
10    memory::userslice::{UserSliceRead, UserSliceReadWrite},
11    process::{block_current_task, current_task_id, wake_task},
12    sync::{SpinLock, SpinLockGuard},
13};
14
15struct FutexQueue {
16    waiters: SpinLock<VecDeque<crate::process::TaskId>>,
17}
18
19impl FutexQueue {
20    /// Creates a new instance.
21    const fn new() -> Self {
22        FutexQueue {
23            waiters: SpinLock::new(VecDeque::new()),
24        }
25    }
26
27    /// Performs the pop waiter operation.
28    fn pop_waiter(&self) -> Option<crate::process::TaskId> {
29        let mut waiters = self.waiters.lock();
30        waiters.pop_front()
31    }
32
33    /// Performs the remove waiter operation.
34    fn remove_waiter(&self, id: crate::process::TaskId) {
35        let mut waiters = self.waiters.lock();
36        if let Some(pos) = waiters.iter().position(|&x| x == id) {
37            waiters.remove(pos);
38        }
39    }
40
41    /// Returns whether empty.
42    fn is_empty(&self) -> bool {
43        self.waiters.lock().is_empty()
44    }
45}
46
47static FUTEX_QUEUES: SpinLock<BTreeMap<u64, Arc<FutexQueue>>> = SpinLock::new(BTreeMap::new());
48
49/// Returns queue.
50fn get_queue(addr: u64) -> Arc<FutexQueue> {
51    let mut map = FUTEX_QUEUES.lock();
52    map.entry(addr)
53        .or_insert_with(|| Arc::new(FutexQueue::new()))
54        .clone()
55}
56
57/// Reads u32.
58fn read_u32(addr: u64) -> Result<u32, SyscallError> {
59    let slice =
60        UserSliceRead::new(addr, core::mem::size_of::<u32>()).map_err(|_| SyscallError::Fault)?;
61    slice.read_val::<u32>().map_err(|_| SyscallError::Fault)
62}
63
64/// Performs the atomic cmpxchg u32 operation.
65#[inline]
66fn atomic_cmpxchg_u32(ptr: *mut u32, expected: u32, desired: u32) -> u32 {
67    use core::sync::atomic::{AtomicU32, Ordering};
68
69    // SAFETY: ptr validated by UserSliceReadWrite in caller. Creating a
70    // shared &AtomicU32 from user memory is sound as long as the mapping
71    // stays live (guaranteed by the UserSlice guard held above).
72    let atomic = unsafe { &*ptr.cast::<AtomicU32>() };
73    atomic
74        .compare_exchange_weak(expected, desired, Ordering::AcqRel, Ordering::Acquire)
75        .unwrap_or_else(|x| x)
76}
77
78/// Performs the atomic fetch update u32 operation.
79fn atomic_fetch_update_u32<F>(addr: u64, update: F) -> Result<u32, SyscallError>
80where
81    F: Fn(u32) -> u32,
82{
83    if (addr & 0x3) != 0 {
84        return Err(SyscallError::InvalidArgument);
85    }
86    let _slice = UserSliceReadWrite::new(addr, core::mem::size_of::<u32>())
87        .map_err(|_| SyscallError::Fault)?;
88    let ptr = addr as *mut u32;
89    let mut cur = unsafe { core::ptr::read_volatile(ptr) };
90    loop {
91        let new = update(cur);
92        let observed = atomic_cmpxchg_u32(ptr, cur, new);
93        if observed == cur {
94            return Ok(cur);
95        }
96        cur = observed;
97    }
98}
99
100/// Performs the lock two queues operation.
101fn lock_two_queues<'a>(
102    addr1: u64,
103    q1: &'a FutexQueue,
104    addr2: u64,
105    q2: &'a FutexQueue,
106) -> (
107    SpinLockGuard<'a, VecDeque<crate::process::TaskId>>,
108    SpinLockGuard<'a, VecDeque<crate::process::TaskId>>,
109) {
110    debug_assert_ne!(addr1, addr2, "lock_two_queues: same address would deadlock");
111    if addr1 < addr2 {
112        let g1 = q1.waiters.lock();
113        let g2 = q2.waiters.lock();
114        (g1, g2)
115    } else {
116        let g2 = q2.waiters.lock();
117        let g1 = q1.waiters.lock();
118        (g1, g2)
119    }
120}
121
122/// Performs the wake from waiters operation.
123fn wake_from_waiters(waiters: &mut VecDeque<crate::process::TaskId>, max_wake: u32) -> u64 {
124    let mut woke = 0u64;
125    while woke < max_wake as u64 {
126        if let Some(id) = waiters.pop_front() {
127            if wake_task(id) {
128                woke += 1;
129            }
130        } else {
131            break;
132        }
133    }
134    woke
135}
136
137/// Performs the do requeue operation.
138fn do_requeue(
139    addr1: u64,
140    max_wake: u32,
141    max_requeue: u32,
142    addr2: u64,
143) -> Result<u64, SyscallError> {
144    if addr1 == addr2 {
145        return sys_futex_wake(addr1, max_wake);
146    }
147
148    let queue1 = {
149        let map = FUTEX_QUEUES.lock();
150        map.get(&addr1).cloned()
151    };
152    let Some(queue1) = queue1 else {
153        return Ok(0);
154    };
155
156    let queue2 = get_queue(addr2);
157
158    let mut woke = 0u64;
159    let mut requeued = 0u64;
160
161    {
162        let (mut w1, mut w2) = lock_two_queues(addr1, &queue1, addr2, &queue2);
163
164        while woke < max_wake as u64 {
165            if let Some(id) = w1.pop_front() {
166                if wake_task(id) {
167                    woke += 1;
168                }
169            } else {
170                break;
171            }
172        }
173
174        while requeued < max_requeue as u64 {
175            if let Some(id) = w1.pop_front() {
176                w2.push_back(id);
177                requeued += 1;
178            } else {
179                break;
180            }
181        }
182    }
183
184    try_gc_queue(addr1, &queue1);
185    try_gc_queue(addr2, &queue2);
186
187    Ok(woke + requeued)
188}
189
190/// Attempts to gc queue.
191fn try_gc_queue(addr: u64, queue: &Arc<FutexQueue>) {
192    let waiters = queue.waiters.lock();
193    if waiters.is_empty() {
194        drop(waiters);
195        let mut map = FUTEX_QUEUES.lock();
196        if let Some(q) = map.get(&addr) {
197            if q.is_empty() {
198                map.remove(&addr);
199            }
200        }
201    }
202}
203
204struct FutexWakeOpEncode {
205    op: u32,
206    is_oparg_shift: bool,
207    cmp: u32,
208    oparg: u32,
209    cmparg: u32,
210}
211
212impl FutexWakeOpEncode {
213    /// Performs the decode operation.
214    fn decode(bits: u32) -> Result<Self, SyscallError> {
215        let is_oparg_shift = ((bits >> 31) & 1) == 1;
216        let op = (bits >> 28) & 0x7;
217        let cmp = (bits >> 24) & 0xF;
218        let oparg = (bits >> 12) & 0xFFF;
219        let cmparg = bits & 0xFFF;
220
221        if op > 4 || cmp > 5 {
222            return Err(SyscallError::InvalidArgument);
223        }
224
225        Ok(Self {
226            op,
227            is_oparg_shift,
228            cmp,
229            oparg,
230            cmparg,
231        })
232    }
233
234    /// Performs the effective oparg operation.
235    fn effective_oparg(&self) -> u32 {
236        if self.is_oparg_shift {
237            1u32 << (self.oparg & 31)
238        } else {
239            self.oparg
240        }
241    }
242
243    /// Performs the calculate new val operation.
244    fn calculate_new_val(&self, old_val: u32) -> u32 {
245        let oparg = self.effective_oparg();
246        match self.op {
247            0 => oparg,
248            1 => old_val.wrapping_add(oparg),
249            2 => old_val | oparg,
250            3 => old_val & !oparg,
251            4 => old_val ^ oparg,
252            _ => old_val,
253        }
254    }
255
256    /// Performs the should wake operation.
257    fn should_wake(&self, old_val: u32) -> bool {
258        match self.cmp {
259            0 => old_val == self.cmparg,
260            1 => old_val != self.cmparg,
261            2 => old_val < self.cmparg,
262            3 => old_val <= self.cmparg,
263            4 => old_val > self.cmparg,
264            5 => old_val >= self.cmparg,
265            _ => false,
266        }
267    }
268}
269
270/// SYS_FUTEX_WAIT: Wait on a futex
271pub fn sys_futex_wait(_addr: u64, _val: u32, _timeout_ns: u64) -> Result<u64, SyscallError> {
272    let addr = _addr;
273    let val = _val;
274    let timeout_ns = _timeout_ns;
275
276    if (addr & 0x3) != 0 {
277        return Err(SyscallError::InvalidArgument);
278    }
279
280    let id = current_task_id().ok_or(SyscallError::PermissionDenied)?;
281    let queue = get_queue(addr);
282
283    let saved_deadline = if timeout_ns != 0 {
284        let deadline = crate::syscall::time::current_time_ns().saturating_add(timeout_ns);
285        if let Some(task) = crate::process::get_task_by_id(id) {
286            task.wake_deadline_ns.store(deadline, Ordering::Relaxed);
287        }
288        deadline
289    } else {
290        0
291    };
292
293    // Hold the futex queue lock while validating the futex word and enqueuing.
294    {
295        let mut waiters = queue.waiters.lock();
296        let cur = read_u32(addr)?;
297        if cur != val {
298            if let Some(task) = crate::process::get_task_by_id(id) {
299                task.wake_deadline_ns.store(0, Ordering::Relaxed);
300            }
301            return Err(SyscallError::Again);
302        }
303        waiters.push_back(id);
304    }
305
306    block_current_task();
307
308    queue.remove_waiter(id);
309    try_gc_queue(addr, &queue);
310
311    if let Some(task) = crate::process::get_task_by_id(id) {
312        task.wake_deadline_ns.store(0, Ordering::Relaxed);
313    }
314    if timeout_ns != 0 && saved_deadline != 0 {
315        let now = crate::syscall::time::current_time_ns();
316        if now >= saved_deadline {
317            return Err(SyscallError::TimedOut);
318        }
319    }
320
321    Ok(0)
322}
323
324/// SYS_FUTEX_WAKE: Wake waiters on a futex
325pub fn sys_futex_wake(_addr: u64, _max_wake: u32) -> Result<u64, SyscallError> {
326    let addr = _addr;
327    let max_wake = _max_wake;
328    let queue = {
329        let map = FUTEX_QUEUES.lock();
330        map.get(&addr).cloned()
331    };
332
333    let Some(queue) = queue else {
334        return Ok(0);
335    };
336
337    let mut woke = 0u64;
338    while woke < max_wake as u64 {
339        if let Some(id) = queue.pop_waiter() {
340            if wake_task(id) {
341                woke += 1;
342            }
343        } else {
344            break;
345        }
346    }
347
348    try_gc_queue(addr, &queue);
349
350    Ok(woke)
351}
352
353/// SYS_FUTEX_REQUEUE: Requeue waiters from one futex to another
354pub fn sys_futex_requeue(
355    _addr1: u64,
356    _max_wake: u32,
357    _max_requeue: u32,
358    _addr2: u64,
359) -> Result<u64, SyscallError> {
360    do_requeue(_addr1, _max_wake, _max_requeue, _addr2)
361}
362
363/// SYS_FUTEX_CMP_REQUEUE: Conditional requeue
364pub fn sys_futex_cmp_requeue(
365    _addr1: u64,
366    _max_wake: u32,
367    _max_requeue: u32,
368    _addr2: u64,
369    _expected_val: u32,
370) -> Result<u64, SyscallError> {
371    if _addr1 == _addr2 {
372        let cur = read_u32(_addr1)?;
373        if cur != _expected_val {
374            return Err(SyscallError::Again);
375        }
376        return sys_futex_wake(_addr1, _max_wake);
377    }
378
379    let queue1 = {
380        let map = FUTEX_QUEUES.lock();
381        map.get(&_addr1).cloned()
382    };
383    let Some(queue1) = queue1 else {
384        let cur = read_u32(_addr1)?;
385        if cur != _expected_val {
386            return Err(SyscallError::Again);
387        }
388        return Ok(0);
389    };
390
391    let queue2 = get_queue(_addr2);
392    let mut woke = 0u64;
393    let mut requeued = 0u64;
394
395    {
396        let (mut w1, mut w2) = lock_two_queues(_addr1, &queue1, _addr2, &queue2);
397        let cur = read_u32(_addr1)?;
398        if cur != _expected_val {
399            return Err(SyscallError::Again);
400        }
401        while woke < _max_wake as u64 {
402            if let Some(id) = w1.pop_front() {
403                if wake_task(id) {
404                    woke += 1;
405                }
406            } else {
407                break;
408            }
409        }
410        while requeued < _max_requeue as u64 {
411            if let Some(id) = w1.pop_front() {
412                w2.push_back(id);
413                requeued += 1;
414            } else {
415                break;
416            }
417        }
418    }
419
420    try_gc_queue(_addr1, &queue1);
421    try_gc_queue(_addr2, &queue2);
422
423    Ok(woke + requeued)
424}
425
426/// SYS_FUTEX_WAKE_OP: Wake with atomic operation
427pub fn sys_futex_wake_op(
428    _addr1: u64,
429    _max_wake1: u32,
430    _max_wake2: u32,
431    _addr2: u64,
432    _op: u32,
433) -> Result<u64, SyscallError> {
434    let addr1 = _addr1;
435    let addr2 = _addr2;
436    let max_wake1 = _max_wake1;
437    let max_wake2 = _max_wake2;
438    let wake_op = FutexWakeOpEncode::decode(_op)?;
439
440    // Materialize queues so wake and wait operations serialize on queue locks.
441    let q1 = get_queue(addr1);
442    let q2 = get_queue(addr2);
443
444    let woke = if addr1 == addr2 {
445        let mut waiters = q1.waiters.lock();
446        let old = atomic_fetch_update_u32(addr2, |v| wake_op.calculate_new_val(v))?;
447        let mut woke = wake_from_waiters(&mut waiters, max_wake1);
448        if wake_op.should_wake(old) {
449            woke += wake_from_waiters(&mut waiters, max_wake2);
450        }
451        woke
452    } else {
453        let (mut w1, mut w2) = lock_two_queues(addr1, &q1, addr2, &q2);
454        let old = atomic_fetch_update_u32(addr2, |v| wake_op.calculate_new_val(v))?;
455        let mut woke = wake_from_waiters(&mut w1, max_wake1);
456        if wake_op.should_wake(old) {
457            woke += wake_from_waiters(&mut w2, max_wake2);
458        }
459        woke
460    };
461    try_gc_queue(addr1, &q1);
462    try_gc_queue(addr2, &q2);
463    Ok(woke)
464}