diff options
| author | Matthew Lugg <mlugg@mlugg.co.uk> | 2025-12-20 13:05:35 +0000 |
|---|---|---|
| committer | Matthew Lugg <mlugg@mlugg.co.uk> | 2025-12-21 13:07:03 +0000 |
| commit | 6ed5b620507aa4589d6c6fe3a5e48e166d48b3d2 (patch) | |
| tree | b1293f8558cd79837c8a7a699020f75af0e51977 /lib/std/Io/Threaded.zig | |
| parent | 330e295bc46a074cac4b9aa1f42f4678426936c5 (diff) | |
| download | zig-6ed5b620507aa4589d6c6fe3a5e48e166d48b3d2.tar.gz zig-6ed5b620507aa4589d6c6fe3a5e48e166d48b3d2.zip | |
std.Io: introduce futex primitives
Co-authored-by: Andrew Kelley <andrew@ziglang.org>
Diffstat (limited to 'lib/std/Io/Threaded.zig')
| -rw-r--r-- | lib/std/Io/Threaded.zig | 527 |
1 files changed, 265 insertions, 262 deletions
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index b9c7f33cee..e71aeb256c 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -155,6 +155,220 @@ const Thread = struct { fn currentSignalId() SignaleeId { return if (std.Thread.use_pthreads) std.c.pthread_self() else std.Thread.getCurrentId(); } + + fn futexWaitUncancelable(ptr: *const u32, expect: u32) void { + return Thread.futexWaitTimed(null, ptr, expect, null) catch unreachable; + } + + fn futexWait(thread: *Thread, ptr: *const u32, expect: u32) Io.Cancelable!void { + return Thread.futexWaitTimed(thread, ptr, expect, null) catch |err| switch (err) { + error.Canceled => return error.Canceled, + error.Timeout => unreachable, + }; + } + + fn futexWaitTimed(thread: ?*Thread, ptr: *const u32, expect: u32, timeout_ns: ?u64) Io.Cancelable!void { + @branchHint(.cold); + + if (builtin.single_threaded) unreachable; // nobody would ever wake us + + if (builtin.cpu.arch.isWasm()) { + comptime assert(builtin.cpu.has(.wasm, .atomics)); + if (thread) |t| try t.checkCancel(); + const to: i64 = if (timeout_ns) |ns| ns else -1; + const signed_expect: i32 = @bitCast(expect); + const result = asm volatile ( + \\local.get %[ptr] + \\local.get %[expected] + \\local.get %[timeout] + \\memory.atomic.wait32 0 + \\local.set %[ret] + : [ret] "=r" (-> u32), + : [ptr] "r" (ptr), + [expected] "r" (signed_expect), + [timeout] "r" (to), + ); + switch (result) { + 0 => {}, // ok + 1 => {}, // expected != loaded + 2 => {}, // timeout + else => assert(!is_debug), + } + } else switch (native_os) { + .linux => { + const linux = std.os.linux; + var ts_buffer: linux.timespec = undefined; + const ts: ?*linux.timespec = if (timeout_ns) |ns| ts: { + ts_buffer = timestampToPosix(ns); + break :ts &ts_buffer; + } else null; + if (thread) |t| try t.beginSyscall(); + const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, ts); + if (thread) |t| t.endSyscall(); + switch (linux.errno(rc)) { + .SUCCESS => {}, // notified by `wake()` + .INTR => {}, // caller's responsibility to retry + .AGAIN => {}, // ptr.* != expect + .INVAL => {}, // possibly timeout overflow + .TIMEDOUT => {}, // timeout + .FAULT => recoverableOsBugDetected(), // ptr was invalid + else => recoverableOsBugDetected(), + } + }, + .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { + const c = std.c; + const flags: c.UL = .{ + .op = .COMPARE_AND_WAIT, + .NO_ERRNO = true, + }; + if (thread) |t| try t.beginSyscall(); + const status = switch (darwin_supports_ulock_wait2) { + true => c.__ulock_wait2(flags, ptr, expect, ns: { + const ns = timeout_ns orelse break :ns 0; + if (ns == 0) break :ns 1; + break :ns ns; + }, 0), + false => c.__ulock_wait(flags, ptr, expect, us: { + const ns = timeout_ns orelse break :us 0; + const us = std.math.lossyCast(u32, ns / std.time.ns_per_us); + if (us == 0) break :us 1; + break :us us; + }), + }; + if (thread) |t| t.endSyscall(); + if (status >= 0) return; + switch (@as(c.E, @enumFromInt(-status))) { + .INTR => {}, // spurious wake + // Address of the futex was paged out. This is unlikely, but possible in theory, and + // pthread/libdispatch on darwin bother to handle it. In this case we'll return + // without waiting, but the caller should retry anyway. + .FAULT => {}, + .TIMEDOUT => {}, // timeout + else => recoverableOsBugDetected(), + } + }, + .windows => { + var timeout_value: windows.LARGE_INTEGER = undefined; + var timeout_ptr: ?*const windows.LARGE_INTEGER = null; + // NTDLL functions work with time in units of 100 nanoseconds. + // Positive values are absolute deadlines while negative values are relative durations. + if (timeout_ns) |delay| { + timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100)); + timeout_value = -timeout_value; + timeout_ptr = &timeout_value; + } + if (thread) |t| try t.checkCancel(); + switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), timeout_ptr)) { + .SUCCESS => {}, + .CANCELLED => {}, + .TIMEOUT => {}, // timeout + else => recoverableOsBugDetected(), + } + }, + .freebsd => { + const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE); + var tm_size: usize = 0; + var tm: std.c._umtx_time = undefined; + var tm_ptr: ?*const std.c._umtx_time = null; + if (timeout_ns) |ns| { + tm_ptr = &tm; + tm_size = @sizeOf(@TypeOf(tm)); + tm.flags = 0; // use relative time not UMTX_ABSTIME + tm.clockid = .MONOTONIC; + tm.timeout = timestampToPosix(ns); + } + if (thread) |t| try t.beginSyscall(); + const rc = std.c._umtx_op(@intFromPtr(ptr), flags, @as(c_ulong, expect), tm_size, @intFromPtr(tm_ptr)); + if (thread) |t| t.endSyscall(); + if (is_debug) switch (posix.errno(rc)) { + .SUCCESS => {}, + .FAULT => unreachable, // one of the args points to invalid memory + .INVAL => unreachable, // arguments should be correct + .TIMEDOUT => {}, // timeout + .INTR => {}, // spurious wake + else => unreachable, + }; + }, + else => @compileError("unimplemented: futexWait"), + } + } + + fn futexWake(ptr: *const u32, max_waiters: u32) void { + @branchHint(.cold); + + if (builtin.single_threaded) return; // nothing to wake up + + if (builtin.cpu.arch.isWasm()) { + comptime assert(builtin.cpu.has(.wasm, .atomics)); + assert(max_waiters != 0); + const woken_count = asm volatile ( + \\local.get %[ptr] + \\local.get %[waiters] + \\memory.atomic.notify 0 + \\local.set %[ret] + : [ret] "=r" (-> u32), + : [ptr] "r" (ptr), + [waiters] "r" (max_waiters), + ); + _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled + } else switch (native_os) { + .linux => { + const linux = std.os.linux; + switch (linux.errno(linux.futex_3arg( + ptr, + .{ .cmd = .WAKE, .private = true }, + @min(max_waiters, std.math.maxInt(i32)), + ))) { + .SUCCESS => return, // successful wake up + .INVAL => return, // invalid futex_wait() on ptr done elsewhere + .FAULT => return, // pointer became invalid while doing the wake + else => return recoverableOsBugDetected(), // deadlock due to operating system bug + } + }, + .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { + const c = std.c; + const flags: c.UL = .{ + .op = .COMPARE_AND_WAIT, + .NO_ERRNO = true, + .WAKE_ALL = max_waiters > 1, + }; + while (true) { + const status = c.__ulock_wake(flags, ptr, 0); + if (status >= 0) return; + switch (@as(c.E, @enumFromInt(-status))) { + .INTR, .CANCELED => continue, // spurious wake() + .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t + .NOENT => return, // nothing was woken up + .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD + else => unreachable, // deadlock due to operating system bug + } + } + }, + .windows => { + assert(max_waiters != 0); + switch (max_waiters) { + 1 => windows.ntdll.RtlWakeAddressSingle(ptr), + else => windows.ntdll.RtlWakeAddressAll(ptr), + } + }, + .freebsd => { + const rc = std.c._umtx_op( + @intFromPtr(ptr), + @intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE), + @as(c_ulong, max_waiters), + 0, // there is no timeout struct + 0, // there is no timeout struct pointer + ); + switch (posix.errno(rc)) { + .SUCCESS => {}, + .FAULT => {}, // it's ok if the ptr doesn't point to valid memory + .INVAL => unreachable, // arguments should be correct + else => unreachable, // deadlock due to operating system bug + } + }, + else => @compileError("unimplemented: futexWake"), + } + } }; const max_iovecs_len = 8; @@ -403,6 +617,10 @@ pub fn io(t: *Threaded) Io { .groupWait = groupWait, .groupCancel = groupCancel, + .futexWait = futexWait, + .futexWaitUncancelable = futexWaitUncancelable, + .futexWake = futexWake, + .mutexLock = mutexLock, .mutexLockUncancelable = mutexLockUncancelable, .mutexUnlock = mutexUnlock, @@ -499,6 +717,10 @@ pub fn ioBasic(t: *Threaded) Io { .groupWait = groupWait, .groupCancel = groupCancel, + .futexWait = futexWait, + .futexWaitUncancelable = futexWaitUncancelable, + .futexWake = futexWake, + .mutexLock = mutexLock, .mutexLockUncancelable = mutexLockUncancelable, .mutexUnlock = mutexUnlock, @@ -1020,6 +1242,38 @@ fn cancel( ac.waitAndDeinit(t, result); } +fn futexWait(userdata: ?*anyopaque, ptr: *const u32, expected: u32, timeout: Io.Timeout) Io.Cancelable!void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const current_thread = Thread.getCurrent(t); + const t_io = ioBasic(t); + const timeout_ns: ?u64 = ns: { + const d = (timeout.toDurationFromNow(t_io) catch break :ns 10) orelse break :ns null; + break :ns std.math.lossyCast(u64, d.raw.toNanoseconds()); + }; + switch (native_os) { + .illumos, .netbsd, .openbsd => @panic("TODO"), + else => try current_thread.futexWaitTimed(ptr, expected, timeout_ns), + } +} + +fn futexWaitUncancelable(userdata: ?*anyopaque, ptr: *const u32, expected: u32) void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; + switch (native_os) { + .illumos, .netbsd, .openbsd => @panic("TODO"), + else => Thread.futexWaitUncancelable(ptr, expected), + } +} + +fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; + switch (native_os) { + .illumos, .netbsd, .openbsd => @panic("TODO"), + else => Thread.futexWake(ptr, max_waiters), + } +} + fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void { if (builtin.single_threaded) unreachable; // Interface should have prevented this. if (native_os == .netbsd) @panic("TODO"); @@ -1027,10 +1281,10 @@ fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex const t: *Threaded = @ptrCast(@alignCast(userdata)); const current_thread = Thread.getCurrent(t); if (prev_state == .contended) { - try futexWait(current_thread, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); + try current_thread.futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { - try futexWait(current_thread, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); + try current_thread.futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } } @@ -1040,10 +1294,10 @@ fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mute if (native_os == .openbsd) @panic("TODO"); _ = userdata; if (prev_state == .contended) { - futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); + Thread.futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) { - futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); + Thread.futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended)); } } @@ -1054,7 +1308,7 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut _ = userdata; _ = prev_state; if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) { - futexWake(@ptrCast(&mutex.state), 1); + Thread.futexWake(@ptrCast(&mutex.state), 1); } } @@ -1081,7 +1335,7 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: defer mutex.lockUncancelable(t_io); while (true) { - futexWaitUncancelable(cond_epoch, epoch); + Thread.futexWaitUncancelable(@ptrCast(cond_epoch), epoch); epoch = cond_epoch.load(.acquire); state = cond_state.load(.monotonic); while (state & signal_mask != 0) { @@ -1125,7 +1379,7 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I defer mutex.lockUncancelable(t_io); while (true) { - try futexWait(current_thread, cond_epoch, epoch); + try current_thread.futexWait(@ptrCast(cond_epoch), epoch); epoch = cond_epoch.load(.acquire); state = cond_state.load(.monotonic); @@ -1198,7 +1452,7 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition. _ = cond_epoch.fetchAdd(1, .release); if (native_os == .netbsd) @panic("TODO"); if (native_os == .openbsd) @panic("TODO"); - futexWake(cond_epoch, to_wake); + Thread.futexWake(@ptrCast(cond_epoch), to_wake); return; }; } @@ -6612,257 +6866,6 @@ fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) Hos /// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention) const darwin_supports_ulock_wait2 = builtin.os.version_range.semver.min.major >= 11; -fn futexWait(current_thread: *Thread, ptr: *const std.atomic.Value(u32), expect: u32) Io.Cancelable!void { - @branchHint(.cold); - - if (builtin.cpu.arch.isWasm()) { - comptime assert(builtin.cpu.has(.wasm, .atomics)); - try current_thread.checkCancel(); - const timeout: i64 = -1; - const signed_expect: i32 = @bitCast(expect); - const result = asm volatile ( - \\local.get %[ptr] - \\local.get %[expected] - \\local.get %[timeout] - \\memory.atomic.wait32 0 - \\local.set %[ret] - : [ret] "=r" (-> u32), - : [ptr] "r" (&ptr.raw), - [expected] "r" (signed_expect), - [timeout] "r" (timeout), - ); - switch (result) { - 0 => {}, // ok - 1 => {}, // expected != loaded - 2 => assert(!is_debug), // timeout - else => assert(!is_debug), - } - } else switch (native_os) { - .linux => { - const linux = std.os.linux; - try current_thread.beginSyscall(); - const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null); - current_thread.endSyscall(); - switch (linux.errno(rc)) { - .SUCCESS => {}, // notified by `wake()` - .INTR => {}, // caller's responsibility to retry - .AGAIN => {}, // ptr.* != expect - .INVAL => {}, // possibly timeout overflow - .TIMEDOUT => recoverableOsBugDetected(), - .FAULT => recoverableOsBugDetected(), // ptr was invalid - else => recoverableOsBugDetected(), - } - }, - .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { - const c = std.c; - const flags: c.UL = .{ - .op = .COMPARE_AND_WAIT, - .NO_ERRNO = true, - }; - try current_thread.beginSyscall(); - const status = if (darwin_supports_ulock_wait2) - c.__ulock_wait2(flags, ptr, expect, 0, 0) - else - c.__ulock_wait(flags, ptr, expect, 0); - current_thread.endSyscall(); - - if (status >= 0) return; - - if (is_debug) switch (@as(c.E, @enumFromInt(-status))) { - .INTR => {}, // spurious wake - // Address of the futex was paged out. This is unlikely, but possible in theory, and - // pthread/libdispatch on darwin bother to handle it. In this case we'll return - // without waiting, but the caller should retry anyway. - .FAULT => {}, - .TIMEDOUT => unreachable, - else => unreachable, - }; - }, - .windows => { - try current_thread.checkCancel(); - switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) { - .SUCCESS => {}, - .CANCELLED => return error.Canceled, - else => recoverableOsBugDetected(), - } - }, - .freebsd => { - const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE); - try current_thread.beginSyscall(); - const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0); - current_thread.endSyscall(); - if (is_debug) switch (posix.errno(rc)) { - .SUCCESS => {}, - .FAULT => unreachable, // one of the args points to invalid memory - .INVAL => unreachable, // arguments should be correct - .TIMEDOUT => unreachable, // no timeout provided - .INTR => {}, // spurious wake - else => unreachable, - }; - }, - else => @compileError("unimplemented: futexWait"), - } -} - -pub fn futexWaitUncancelable(ptr: *const std.atomic.Value(u32), expect: u32) void { - @branchHint(.cold); - - if (builtin.cpu.arch.isWasm()) { - comptime assert(builtin.cpu.has(.wasm, .atomics)); - const timeout: i64 = -1; - const signed_expect: i32 = @bitCast(expect); - const result = asm volatile ( - \\local.get %[ptr] - \\local.get %[expected] - \\local.get %[timeout] - \\memory.atomic.wait32 0 - \\local.set %[ret] - : [ret] "=r" (-> u32), - : [ptr] "r" (&ptr.raw), - [expected] "r" (signed_expect), - [timeout] "r" (timeout), - ); - switch (result) { - 0 => {}, // ok - 1 => {}, // expected != loaded - 2 => recoverableOsBugDetected(), // timeout - else => recoverableOsBugDetected(), - } - } else switch (native_os) { - .linux => { - const linux = std.os.linux; - const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null); - switch (linux.errno(rc)) { - .SUCCESS => {}, // notified by `wake()` - .INTR => {}, // caller's responsibility to repeat - .AGAIN => {}, // ptr.* != expect - .INVAL => {}, // possibly timeout overflow - .TIMEDOUT => recoverableOsBugDetected(), - .FAULT => recoverableOsBugDetected(), // ptr was invalid - else => recoverableOsBugDetected(), - } - }, - .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { - const c = std.c; - const flags: c.UL = .{ - .op = .COMPARE_AND_WAIT, - .NO_ERRNO = true, - }; - const status = if (darwin_supports_ulock_wait2) - c.__ulock_wait2(flags, ptr, expect, 0, 0) - else - c.__ulock_wait(flags, ptr, expect, 0); - - if (status >= 0) return; - - switch (@as(c.E, @enumFromInt(-status))) { - // Wait was interrupted by the OS or other spurious signalling. - .INTR => {}, - // Address of the futex was paged out. This is unlikely, but possible in theory, and - // pthread/libdispatch on darwin bother to handle it. In this case we'll return - // without waiting, but the caller should retry anyway. - .FAULT => {}, - .TIMEDOUT => recoverableOsBugDetected(), - else => recoverableOsBugDetected(), - } - }, - .windows => { - switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) { - .SUCCESS, .CANCELLED => {}, - else => recoverableOsBugDetected(), - } - }, - .freebsd => { - const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE); - const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0); - switch (posix.errno(rc)) { - .SUCCESS => {}, - .INTR => {}, // spurious wake - .FAULT => recoverableOsBugDetected(), // one of the args points to invalid memory - .INVAL => recoverableOsBugDetected(), // arguments should be correct - .TIMEDOUT => recoverableOsBugDetected(), // no timeout provided - else => recoverableOsBugDetected(), - } - }, - else => @compileError("unimplemented: futexWaitUncancelable"), - } -} - -pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void { - @branchHint(.cold); - - if (builtin.cpu.arch.isWasm()) { - comptime assert(builtin.cpu.has(.wasm, .atomics)); - assert(max_waiters != 0); - const woken_count = asm volatile ( - \\local.get %[ptr] - \\local.get %[waiters] - \\memory.atomic.notify 0 - \\local.set %[ret] - : [ret] "=r" (-> u32), - : [ptr] "r" (&ptr.raw), - [waiters] "r" (max_waiters), - ); - _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled - } else switch (native_os) { - .linux => { - const linux = std.os.linux; - switch (linux.errno(linux.futex_3arg( - &ptr.raw, - .{ .cmd = .WAKE, .private = true }, - @min(max_waiters, std.math.maxInt(i32)), - ))) { - .SUCCESS => return, // successful wake up - .INVAL => return, // invalid futex_wait() on ptr done elsewhere - .FAULT => return, // pointer became invalid while doing the wake - else => return recoverableOsBugDetected(), // deadlock due to operating system bug - } - }, - .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { - const c = std.c; - const flags: c.UL = .{ - .op = .COMPARE_AND_WAIT, - .NO_ERRNO = true, - .WAKE_ALL = max_waiters > 1, - }; - while (true) { - const status = c.__ulock_wake(flags, ptr, 0); - if (status >= 0) return; - switch (@as(c.E, @enumFromInt(-status))) { - .INTR, .CANCELED => continue, // spurious wake() - .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t - .NOENT => return, // nothing was woken up - .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD - else => unreachable, // deadlock due to operating system bug - } - } - }, - .windows => { - assert(max_waiters != 0); - switch (max_waiters) { - 1 => windows.ntdll.RtlWakeAddressSingle(ptr), - else => windows.ntdll.RtlWakeAddressAll(ptr), - } - }, - .freebsd => { - const rc = std.c._umtx_op( - @intFromPtr(&ptr.raw), - @intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE), - @as(c_ulong, max_waiters), - 0, // there is no timeout struct - 0, // there is no timeout struct pointer - ); - switch (posix.errno(rc)) { - .SUCCESS => {}, - .FAULT => {}, // it's ok if the ptr doesn't point to valid memory - .INVAL => unreachable, // arguments should be correct - else => unreachable, // deadlock due to operating system bug - } - }, - else => @compileError("unimplemented: futexWake"), - } -} - /// A thread-safe logical boolean value which can be `set` and `unset`. /// /// It can also block threads until the value is set with cancelation via timed @@ -6919,7 +6922,7 @@ const ResetEventFutex = enum(u32) { } const current_thread = Thread.getCurrent(t); while (state == .waiting) { - try futexWait(current_thread, @ptrCast(ref), @intFromEnum(ResetEventFutex.waiting)); + try current_thread.futexWait(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting)); state = @atomicLoad(ResetEventFutex, ref, .acquire); } assert(state == .is_set); @@ -6944,7 +6947,7 @@ const ResetEventFutex = enum(u32) { state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting; } while (state == .waiting) { - futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting)); + Thread.futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting)); state = @atomicLoad(ResetEventFutex, ref, .acquire); } assert(state == .is_set); @@ -6964,7 +6967,7 @@ const ResetEventFutex = enum(u32) { return; } if (@atomicRmw(ResetEventFutex, ref, .Xchg, .is_set, .release) == .waiting) { - futexWake(@ptrCast(ref), std.math.maxInt(u32)); + Thread.futexWake(@ptrCast(ref), std.math.maxInt(u32)); } } |
