aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Io/Threaded.zig
diff options
context:
space:
mode:
authorMatthew Lugg <mlugg@mlugg.co.uk>2025-12-20 13:05:35 +0000
committerMatthew Lugg <mlugg@mlugg.co.uk>2025-12-21 13:07:03 +0000
commit6ed5b620507aa4589d6c6fe3a5e48e166d48b3d2 (patch)
treeb1293f8558cd79837c8a7a699020f75af0e51977 /lib/std/Io/Threaded.zig
parent330e295bc46a074cac4b9aa1f42f4678426936c5 (diff)
downloadzig-6ed5b620507aa4589d6c6fe3a5e48e166d48b3d2.tar.gz
zig-6ed5b620507aa4589d6c6fe3a5e48e166d48b3d2.zip
std.Io: introduce futex primitives
Co-authored-by: Andrew Kelley <andrew@ziglang.org>
Diffstat (limited to 'lib/std/Io/Threaded.zig')
-rw-r--r--lib/std/Io/Threaded.zig527
1 files changed, 265 insertions, 262 deletions
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
index b9c7f33cee..e71aeb256c 100644
--- a/lib/std/Io/Threaded.zig
+++ b/lib/std/Io/Threaded.zig
@@ -155,6 +155,220 @@ const Thread = struct {
fn currentSignalId() SignaleeId {
return if (std.Thread.use_pthreads) std.c.pthread_self() else std.Thread.getCurrentId();
}
+
+ fn futexWaitUncancelable(ptr: *const u32, expect: u32) void {
+ return Thread.futexWaitTimed(null, ptr, expect, null) catch unreachable;
+ }
+
+ fn futexWait(thread: *Thread, ptr: *const u32, expect: u32) Io.Cancelable!void {
+ return Thread.futexWaitTimed(thread, ptr, expect, null) catch |err| switch (err) {
+ error.Canceled => return error.Canceled,
+ error.Timeout => unreachable,
+ };
+ }
+
+ fn futexWaitTimed(thread: ?*Thread, ptr: *const u32, expect: u32, timeout_ns: ?u64) Io.Cancelable!void {
+ @branchHint(.cold);
+
+ if (builtin.single_threaded) unreachable; // nobody would ever wake us
+
+ if (builtin.cpu.arch.isWasm()) {
+ comptime assert(builtin.cpu.has(.wasm, .atomics));
+ if (thread) |t| try t.checkCancel();
+ const to: i64 = if (timeout_ns) |ns| ns else -1;
+ const signed_expect: i32 = @bitCast(expect);
+ const result = asm volatile (
+ \\local.get %[ptr]
+ \\local.get %[expected]
+ \\local.get %[timeout]
+ \\memory.atomic.wait32 0
+ \\local.set %[ret]
+ : [ret] "=r" (-> u32),
+ : [ptr] "r" (ptr),
+ [expected] "r" (signed_expect),
+ [timeout] "r" (to),
+ );
+ switch (result) {
+ 0 => {}, // ok
+ 1 => {}, // expected != loaded
+ 2 => {}, // timeout
+ else => assert(!is_debug),
+ }
+ } else switch (native_os) {
+ .linux => {
+ const linux = std.os.linux;
+ var ts_buffer: linux.timespec = undefined;
+ const ts: ?*linux.timespec = if (timeout_ns) |ns| ts: {
+ ts_buffer = timestampToPosix(ns);
+ break :ts &ts_buffer;
+ } else null;
+ if (thread) |t| try t.beginSyscall();
+ const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, ts);
+ if (thread) |t| t.endSyscall();
+ switch (linux.errno(rc)) {
+ .SUCCESS => {}, // notified by `wake()`
+ .INTR => {}, // caller's responsibility to retry
+ .AGAIN => {}, // ptr.* != expect
+ .INVAL => {}, // possibly timeout overflow
+ .TIMEDOUT => {}, // timeout
+ .FAULT => recoverableOsBugDetected(), // ptr was invalid
+ else => recoverableOsBugDetected(),
+ }
+ },
+ .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
+ const c = std.c;
+ const flags: c.UL = .{
+ .op = .COMPARE_AND_WAIT,
+ .NO_ERRNO = true,
+ };
+ if (thread) |t| try t.beginSyscall();
+ const status = switch (darwin_supports_ulock_wait2) {
+ true => c.__ulock_wait2(flags, ptr, expect, ns: {
+ const ns = timeout_ns orelse break :ns 0;
+ if (ns == 0) break :ns 1;
+ break :ns ns;
+ }, 0),
+ false => c.__ulock_wait(flags, ptr, expect, us: {
+ const ns = timeout_ns orelse break :us 0;
+ const us = std.math.lossyCast(u32, ns / std.time.ns_per_us);
+ if (us == 0) break :us 1;
+ break :us us;
+ }),
+ };
+ if (thread) |t| t.endSyscall();
+ if (status >= 0) return;
+ switch (@as(c.E, @enumFromInt(-status))) {
+ .INTR => {}, // spurious wake
+ // Address of the futex was paged out. This is unlikely, but possible in theory, and
+ // pthread/libdispatch on darwin bother to handle it. In this case we'll return
+ // without waiting, but the caller should retry anyway.
+ .FAULT => {},
+ .TIMEDOUT => {}, // timeout
+ else => recoverableOsBugDetected(),
+ }
+ },
+ .windows => {
+ var timeout_value: windows.LARGE_INTEGER = undefined;
+ var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
+ // NTDLL functions work with time in units of 100 nanoseconds.
+ // Positive values are absolute deadlines while negative values are relative durations.
+ if (timeout_ns) |delay| {
+ timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100));
+ timeout_value = -timeout_value;
+ timeout_ptr = &timeout_value;
+ }
+ if (thread) |t| try t.checkCancel();
+ switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), timeout_ptr)) {
+ .SUCCESS => {},
+ .CANCELLED => {},
+ .TIMEOUT => {}, // timeout
+ else => recoverableOsBugDetected(),
+ }
+ },
+ .freebsd => {
+ const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE);
+ var tm_size: usize = 0;
+ var tm: std.c._umtx_time = undefined;
+ var tm_ptr: ?*const std.c._umtx_time = null;
+ if (timeout_ns) |ns| {
+ tm_ptr = &tm;
+ tm_size = @sizeOf(@TypeOf(tm));
+ tm.flags = 0; // use relative time not UMTX_ABSTIME
+ tm.clockid = .MONOTONIC;
+ tm.timeout = timestampToPosix(ns);
+ }
+ if (thread) |t| try t.beginSyscall();
+ const rc = std.c._umtx_op(@intFromPtr(ptr), flags, @as(c_ulong, expect), tm_size, @intFromPtr(tm_ptr));
+ if (thread) |t| t.endSyscall();
+ if (is_debug) switch (posix.errno(rc)) {
+ .SUCCESS => {},
+ .FAULT => unreachable, // one of the args points to invalid memory
+ .INVAL => unreachable, // arguments should be correct
+ .TIMEDOUT => {}, // timeout
+ .INTR => {}, // spurious wake
+ else => unreachable,
+ };
+ },
+ else => @compileError("unimplemented: futexWait"),
+ }
+ }
+
+ fn futexWake(ptr: *const u32, max_waiters: u32) void {
+ @branchHint(.cold);
+
+ if (builtin.single_threaded) return; // nothing to wake up
+
+ if (builtin.cpu.arch.isWasm()) {
+ comptime assert(builtin.cpu.has(.wasm, .atomics));
+ assert(max_waiters != 0);
+ const woken_count = asm volatile (
+ \\local.get %[ptr]
+ \\local.get %[waiters]
+ \\memory.atomic.notify 0
+ \\local.set %[ret]
+ : [ret] "=r" (-> u32),
+ : [ptr] "r" (ptr),
+ [waiters] "r" (max_waiters),
+ );
+ _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
+ } else switch (native_os) {
+ .linux => {
+ const linux = std.os.linux;
+ switch (linux.errno(linux.futex_3arg(
+ ptr,
+ .{ .cmd = .WAKE, .private = true },
+ @min(max_waiters, std.math.maxInt(i32)),
+ ))) {
+ .SUCCESS => return, // successful wake up
+ .INVAL => return, // invalid futex_wait() on ptr done elsewhere
+ .FAULT => return, // pointer became invalid while doing the wake
+ else => return recoverableOsBugDetected(), // deadlock due to operating system bug
+ }
+ },
+ .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
+ const c = std.c;
+ const flags: c.UL = .{
+ .op = .COMPARE_AND_WAIT,
+ .NO_ERRNO = true,
+ .WAKE_ALL = max_waiters > 1,
+ };
+ while (true) {
+ const status = c.__ulock_wake(flags, ptr, 0);
+ if (status >= 0) return;
+ switch (@as(c.E, @enumFromInt(-status))) {
+ .INTR, .CANCELED => continue, // spurious wake()
+ .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
+ .NOENT => return, // nothing was woken up
+ .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD
+ else => unreachable, // deadlock due to operating system bug
+ }
+ }
+ },
+ .windows => {
+ assert(max_waiters != 0);
+ switch (max_waiters) {
+ 1 => windows.ntdll.RtlWakeAddressSingle(ptr),
+ else => windows.ntdll.RtlWakeAddressAll(ptr),
+ }
+ },
+ .freebsd => {
+ const rc = std.c._umtx_op(
+ @intFromPtr(ptr),
+ @intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE),
+ @as(c_ulong, max_waiters),
+ 0, // there is no timeout struct
+ 0, // there is no timeout struct pointer
+ );
+ switch (posix.errno(rc)) {
+ .SUCCESS => {},
+ .FAULT => {}, // it's ok if the ptr doesn't point to valid memory
+ .INVAL => unreachable, // arguments should be correct
+ else => unreachable, // deadlock due to operating system bug
+ }
+ },
+ else => @compileError("unimplemented: futexWake"),
+ }
+ }
};
const max_iovecs_len = 8;
@@ -403,6 +617,10 @@ pub fn io(t: *Threaded) Io {
.groupWait = groupWait,
.groupCancel = groupCancel,
+ .futexWait = futexWait,
+ .futexWaitUncancelable = futexWaitUncancelable,
+ .futexWake = futexWake,
+
.mutexLock = mutexLock,
.mutexLockUncancelable = mutexLockUncancelable,
.mutexUnlock = mutexUnlock,
@@ -499,6 +717,10 @@ pub fn ioBasic(t: *Threaded) Io {
.groupWait = groupWait,
.groupCancel = groupCancel,
+ .futexWait = futexWait,
+ .futexWaitUncancelable = futexWaitUncancelable,
+ .futexWake = futexWake,
+
.mutexLock = mutexLock,
.mutexLockUncancelable = mutexLockUncancelable,
.mutexUnlock = mutexUnlock,
@@ -1020,6 +1242,38 @@ fn cancel(
ac.waitAndDeinit(t, result);
}
+fn futexWait(userdata: ?*anyopaque, ptr: *const u32, expected: u32, timeout: Io.Timeout) Io.Cancelable!void {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const current_thread = Thread.getCurrent(t);
+ const t_io = ioBasic(t);
+ const timeout_ns: ?u64 = ns: {
+ const d = (timeout.toDurationFromNow(t_io) catch break :ns 10) orelse break :ns null;
+ break :ns std.math.lossyCast(u64, d.raw.toNanoseconds());
+ };
+ switch (native_os) {
+ .illumos, .netbsd, .openbsd => @panic("TODO"),
+ else => try current_thread.futexWaitTimed(ptr, expected, timeout_ns),
+ }
+}
+
+fn futexWaitUncancelable(userdata: ?*anyopaque, ptr: *const u32, expected: u32) void {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
+ switch (native_os) {
+ .illumos, .netbsd, .openbsd => @panic("TODO"),
+ else => Thread.futexWaitUncancelable(ptr, expected),
+ }
+}
+
+fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
+ switch (native_os) {
+ .illumos, .netbsd, .openbsd => @panic("TODO"),
+ else => Thread.futexWake(ptr, max_waiters),
+ }
+}
+
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
if (builtin.single_threaded) unreachable; // Interface should have prevented this.
if (native_os == .netbsd) @panic("TODO");
@@ -1027,10 +1281,10 @@ fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex
const t: *Threaded = @ptrCast(@alignCast(userdata));
const current_thread = Thread.getCurrent(t);
if (prev_state == .contended) {
- try futexWait(current_thread, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
+ try current_thread.futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
- try futexWait(current_thread, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
+ try current_thread.futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
@@ -1040,10 +1294,10 @@ fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mute
if (native_os == .openbsd) @panic("TODO");
_ = userdata;
if (prev_state == .contended) {
- futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
+ Thread.futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
- futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
+ Thread.futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
@@ -1054,7 +1308,7 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
_ = userdata;
_ = prev_state;
if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) {
- futexWake(@ptrCast(&mutex.state), 1);
+ Thread.futexWake(@ptrCast(&mutex.state), 1);
}
}
@@ -1081,7 +1335,7 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex:
defer mutex.lockUncancelable(t_io);
while (true) {
- futexWaitUncancelable(cond_epoch, epoch);
+ Thread.futexWaitUncancelable(@ptrCast(cond_epoch), epoch);
epoch = cond_epoch.load(.acquire);
state = cond_state.load(.monotonic);
while (state & signal_mask != 0) {
@@ -1125,7 +1379,7 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I
defer mutex.lockUncancelable(t_io);
while (true) {
- try futexWait(current_thread, cond_epoch, epoch);
+ try current_thread.futexWait(@ptrCast(cond_epoch), epoch);
epoch = cond_epoch.load(.acquire);
state = cond_state.load(.monotonic);
@@ -1198,7 +1452,7 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.
_ = cond_epoch.fetchAdd(1, .release);
if (native_os == .netbsd) @panic("TODO");
if (native_os == .openbsd) @panic("TODO");
- futexWake(cond_epoch, to_wake);
+ Thread.futexWake(@ptrCast(cond_epoch), to_wake);
return;
};
}
@@ -6612,257 +6866,6 @@ fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) Hos
/// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
const darwin_supports_ulock_wait2 = builtin.os.version_range.semver.min.major >= 11;
-fn futexWait(current_thread: *Thread, ptr: *const std.atomic.Value(u32), expect: u32) Io.Cancelable!void {
- @branchHint(.cold);
-
- if (builtin.cpu.arch.isWasm()) {
- comptime assert(builtin.cpu.has(.wasm, .atomics));
- try current_thread.checkCancel();
- const timeout: i64 = -1;
- const signed_expect: i32 = @bitCast(expect);
- const result = asm volatile (
- \\local.get %[ptr]
- \\local.get %[expected]
- \\local.get %[timeout]
- \\memory.atomic.wait32 0
- \\local.set %[ret]
- : [ret] "=r" (-> u32),
- : [ptr] "r" (&ptr.raw),
- [expected] "r" (signed_expect),
- [timeout] "r" (timeout),
- );
- switch (result) {
- 0 => {}, // ok
- 1 => {}, // expected != loaded
- 2 => assert(!is_debug), // timeout
- else => assert(!is_debug),
- }
- } else switch (native_os) {
- .linux => {
- const linux = std.os.linux;
- try current_thread.beginSyscall();
- const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
- current_thread.endSyscall();
- switch (linux.errno(rc)) {
- .SUCCESS => {}, // notified by `wake()`
- .INTR => {}, // caller's responsibility to retry
- .AGAIN => {}, // ptr.* != expect
- .INVAL => {}, // possibly timeout overflow
- .TIMEDOUT => recoverableOsBugDetected(),
- .FAULT => recoverableOsBugDetected(), // ptr was invalid
- else => recoverableOsBugDetected(),
- }
- },
- .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
- const c = std.c;
- const flags: c.UL = .{
- .op = .COMPARE_AND_WAIT,
- .NO_ERRNO = true,
- };
- try current_thread.beginSyscall();
- const status = if (darwin_supports_ulock_wait2)
- c.__ulock_wait2(flags, ptr, expect, 0, 0)
- else
- c.__ulock_wait(flags, ptr, expect, 0);
- current_thread.endSyscall();
-
- if (status >= 0) return;
-
- if (is_debug) switch (@as(c.E, @enumFromInt(-status))) {
- .INTR => {}, // spurious wake
- // Address of the futex was paged out. This is unlikely, but possible in theory, and
- // pthread/libdispatch on darwin bother to handle it. In this case we'll return
- // without waiting, but the caller should retry anyway.
- .FAULT => {},
- .TIMEDOUT => unreachable,
- else => unreachable,
- };
- },
- .windows => {
- try current_thread.checkCancel();
- switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) {
- .SUCCESS => {},
- .CANCELLED => return error.Canceled,
- else => recoverableOsBugDetected(),
- }
- },
- .freebsd => {
- const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE);
- try current_thread.beginSyscall();
- const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0);
- current_thread.endSyscall();
- if (is_debug) switch (posix.errno(rc)) {
- .SUCCESS => {},
- .FAULT => unreachable, // one of the args points to invalid memory
- .INVAL => unreachable, // arguments should be correct
- .TIMEDOUT => unreachable, // no timeout provided
- .INTR => {}, // spurious wake
- else => unreachable,
- };
- },
- else => @compileError("unimplemented: futexWait"),
- }
-}
-
-pub fn futexWaitUncancelable(ptr: *const std.atomic.Value(u32), expect: u32) void {
- @branchHint(.cold);
-
- if (builtin.cpu.arch.isWasm()) {
- comptime assert(builtin.cpu.has(.wasm, .atomics));
- const timeout: i64 = -1;
- const signed_expect: i32 = @bitCast(expect);
- const result = asm volatile (
- \\local.get %[ptr]
- \\local.get %[expected]
- \\local.get %[timeout]
- \\memory.atomic.wait32 0
- \\local.set %[ret]
- : [ret] "=r" (-> u32),
- : [ptr] "r" (&ptr.raw),
- [expected] "r" (signed_expect),
- [timeout] "r" (timeout),
- );
- switch (result) {
- 0 => {}, // ok
- 1 => {}, // expected != loaded
- 2 => recoverableOsBugDetected(), // timeout
- else => recoverableOsBugDetected(),
- }
- } else switch (native_os) {
- .linux => {
- const linux = std.os.linux;
- const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
- switch (linux.errno(rc)) {
- .SUCCESS => {}, // notified by `wake()`
- .INTR => {}, // caller's responsibility to repeat
- .AGAIN => {}, // ptr.* != expect
- .INVAL => {}, // possibly timeout overflow
- .TIMEDOUT => recoverableOsBugDetected(),
- .FAULT => recoverableOsBugDetected(), // ptr was invalid
- else => recoverableOsBugDetected(),
- }
- },
- .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
- const c = std.c;
- const flags: c.UL = .{
- .op = .COMPARE_AND_WAIT,
- .NO_ERRNO = true,
- };
- const status = if (darwin_supports_ulock_wait2)
- c.__ulock_wait2(flags, ptr, expect, 0, 0)
- else
- c.__ulock_wait(flags, ptr, expect, 0);
-
- if (status >= 0) return;
-
- switch (@as(c.E, @enumFromInt(-status))) {
- // Wait was interrupted by the OS or other spurious signalling.
- .INTR => {},
- // Address of the futex was paged out. This is unlikely, but possible in theory, and
- // pthread/libdispatch on darwin bother to handle it. In this case we'll return
- // without waiting, but the caller should retry anyway.
- .FAULT => {},
- .TIMEDOUT => recoverableOsBugDetected(),
- else => recoverableOsBugDetected(),
- }
- },
- .windows => {
- switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) {
- .SUCCESS, .CANCELLED => {},
- else => recoverableOsBugDetected(),
- }
- },
- .freebsd => {
- const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE);
- const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0);
- switch (posix.errno(rc)) {
- .SUCCESS => {},
- .INTR => {}, // spurious wake
- .FAULT => recoverableOsBugDetected(), // one of the args points to invalid memory
- .INVAL => recoverableOsBugDetected(), // arguments should be correct
- .TIMEDOUT => recoverableOsBugDetected(), // no timeout provided
- else => recoverableOsBugDetected(),
- }
- },
- else => @compileError("unimplemented: futexWaitUncancelable"),
- }
-}
-
-pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void {
- @branchHint(.cold);
-
- if (builtin.cpu.arch.isWasm()) {
- comptime assert(builtin.cpu.has(.wasm, .atomics));
- assert(max_waiters != 0);
- const woken_count = asm volatile (
- \\local.get %[ptr]
- \\local.get %[waiters]
- \\memory.atomic.notify 0
- \\local.set %[ret]
- : [ret] "=r" (-> u32),
- : [ptr] "r" (&ptr.raw),
- [waiters] "r" (max_waiters),
- );
- _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
- } else switch (native_os) {
- .linux => {
- const linux = std.os.linux;
- switch (linux.errno(linux.futex_3arg(
- &ptr.raw,
- .{ .cmd = .WAKE, .private = true },
- @min(max_waiters, std.math.maxInt(i32)),
- ))) {
- .SUCCESS => return, // successful wake up
- .INVAL => return, // invalid futex_wait() on ptr done elsewhere
- .FAULT => return, // pointer became invalid while doing the wake
- else => return recoverableOsBugDetected(), // deadlock due to operating system bug
- }
- },
- .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
- const c = std.c;
- const flags: c.UL = .{
- .op = .COMPARE_AND_WAIT,
- .NO_ERRNO = true,
- .WAKE_ALL = max_waiters > 1,
- };
- while (true) {
- const status = c.__ulock_wake(flags, ptr, 0);
- if (status >= 0) return;
- switch (@as(c.E, @enumFromInt(-status))) {
- .INTR, .CANCELED => continue, // spurious wake()
- .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
- .NOENT => return, // nothing was woken up
- .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD
- else => unreachable, // deadlock due to operating system bug
- }
- }
- },
- .windows => {
- assert(max_waiters != 0);
- switch (max_waiters) {
- 1 => windows.ntdll.RtlWakeAddressSingle(ptr),
- else => windows.ntdll.RtlWakeAddressAll(ptr),
- }
- },
- .freebsd => {
- const rc = std.c._umtx_op(
- @intFromPtr(&ptr.raw),
- @intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE),
- @as(c_ulong, max_waiters),
- 0, // there is no timeout struct
- 0, // there is no timeout struct pointer
- );
- switch (posix.errno(rc)) {
- .SUCCESS => {},
- .FAULT => {}, // it's ok if the ptr doesn't point to valid memory
- .INVAL => unreachable, // arguments should be correct
- else => unreachable, // deadlock due to operating system bug
- }
- },
- else => @compileError("unimplemented: futexWake"),
- }
-}
-
/// A thread-safe logical boolean value which can be `set` and `unset`.
///
/// It can also block threads until the value is set with cancelation via timed
@@ -6919,7 +6922,7 @@ const ResetEventFutex = enum(u32) {
}
const current_thread = Thread.getCurrent(t);
while (state == .waiting) {
- try futexWait(current_thread, @ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
+ try current_thread.futexWait(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
state = @atomicLoad(ResetEventFutex, ref, .acquire);
}
assert(state == .is_set);
@@ -6944,7 +6947,7 @@ const ResetEventFutex = enum(u32) {
state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting;
}
while (state == .waiting) {
- futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
+ Thread.futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
state = @atomicLoad(ResetEventFutex, ref, .acquire);
}
assert(state == .is_set);
@@ -6964,7 +6967,7 @@ const ResetEventFutex = enum(u32) {
return;
}
if (@atomicRmw(ResetEventFutex, ref, .Xchg, .is_set, .release) == .waiting) {
- futexWake(@ptrCast(ref), std.math.maxInt(u32));
+ Thread.futexWake(@ptrCast(ref), std.math.maxInt(u32));
}
}