aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread/Futex.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2022-07-01 15:52:54 -0700
committerAndrew Kelley <andrew@ziglang.org>2022-07-01 15:52:54 -0700
commitc89dd15e1be4959800dc7092d7dd4375253db7bc (patch)
treeca184ae53592efa21e67128a5f891d642d7f1118 /lib/std/Thread/Futex.zig
parent5466e87fce581f2ef90ac23bb80b1dbc05836fc6 (diff)
parent2360f8c490f3ec684ed64ff28e8c1fade249070b (diff)
downloadzig-c89dd15e1be4959800dc7092d7dd4375253db7bc.tar.gz
zig-c89dd15e1be4959800dc7092d7dd4375253db7bc.zip
Merge remote-tracking branch 'origin/master' into llvm14
Diffstat (limited to 'lib/std/Thread/Futex.zig')
-rw-r--r--lib/std/Thread/Futex.zig1159
1 files changed, 802 insertions, 357 deletions
diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig
index c9a19b4cd9..0d9ccc8969 100644
--- a/lib/std/Thread/Futex.zig
+++ b/lib/std/Thread/Futex.zig
@@ -1,180 +1,168 @@
//! Futex is a mechanism used to block (`wait`) and unblock (`wake`) threads using a 32bit memory address as hints.
//! Blocking a thread is acknowledged only if the 32bit memory address is equal to a given value.
//! This check helps avoid block/unblock deadlocks which occur if a `wake()` happens before a `wait()`.
-//! Using Futex, other Thread synchronization primitives can be built which efficiently wait for cross-thread events or signals.
+//! Using Futex, other Thread synchronization primitives can be built which efficiently wait for cross-thread events or signals.
const std = @import("../std.zig");
const builtin = @import("builtin");
const Futex = @This();
-const target = builtin.target;
-const single_threaded = builtin.single_threaded;
-
+const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
-
const Atomic = std.atomic.Atomic;
-const spinLoopHint = std.atomic.spinLoopHint;
/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
/// - The value at `ptr` is no longer equal to `expect`.
/// - The caller is unblocked by a matching `wake()`.
-/// - The caller is unblocked spuriously by an arbitrary internal signal.
-///
-/// If `timeout` is provided, and the caller is blocked for longer than `timeout` nanoseconds`, `error.TimedOut` is returned.
+/// - The caller is unblocked spuriously ("at random").
///
/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
-pub fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
- if (single_threaded) {
- // check whether the caller should block
- if (ptr.loadUnchecked() != expect) {
- return;
- }
+pub fn wait(ptr: *const Atomic(u32), expect: u32) void {
+ @setCold(true);
- // There are no other threads which could notify the caller on single_threaded.
- // Therefor a wait() without a timeout would block indefinitely.
- const timeout_ns = timeout orelse {
- @panic("deadlock");
- };
+ Impl.wait(ptr, expect, null) catch |err| switch (err) {
+ error.Timeout => unreachable, // null timeout meant to wait forever
+ };
+}
- // Simulate blocking with the timeout knowing that:
- // - no other thread can change the ptr value
- // - no other thread could unblock us if we waiting on the ptr
- std.time.sleep(timeout_ns);
- return error.TimedOut;
- }
+/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
+/// - The value at `ptr` is no longer equal to `expect`.
+/// - The caller is unblocked by a matching `wake()`.
+/// - The caller is unblocked spuriously ("at random").
+/// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned.
+///
+/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
+/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
+pub fn timedWait(ptr: *const Atomic(u32), expect: u32, timeout_ns: u64) error{Timeout}!void {
+ @setCold(true);
- // Avoid calling into the OS for no-op waits()
- if (timeout) |timeout_ns| {
- if (timeout_ns == 0) {
- if (ptr.load(.SeqCst) != expect) return;
- return error.TimedOut;
- }
+ // Avoid calling into the OS for no-op timeouts.
+ if (timeout_ns == 0) {
+ if (ptr.load(.SeqCst) != expect) return;
+ return error.Timeout;
}
- return OsFutex.wait(ptr, expect, timeout);
+ return Impl.wait(ptr, expect, timeout_ns);
}
-/// Unblocks at most `num_waiters` callers blocked in a `wait()` call on `ptr`.
-/// `num_waiters` of 1 unblocks at most one `wait(ptr, ...)` and `maxInt(u32)` unblocks effectively all `wait(ptr, ...)`.
-pub fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
- if (single_threaded) return;
- if (num_waiters == 0) return;
+/// Unblocks at most `max_waiters` callers blocked in a `wait()` call on `ptr`.
+pub fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ @setCold(true);
- return OsFutex.wake(ptr, num_waiters);
+ // Avoid calling into the OS if there's nothing to wake up.
+ if (max_waiters == 0) {
+ return;
+ }
+
+ Impl.wake(ptr, max_waiters);
}
-const OsFutex = if (target.os.tag == .windows)
- WindowsFutex
-else if (target.os.tag == .linux)
- LinuxFutex
-else if (target.isDarwin())
- DarwinFutex
-else if (builtin.link_libc)
- PosixFutex
+const Impl = if (builtin.single_threaded)
+ SingleThreadedImpl
+else if (builtin.os.tag == .windows)
+ WindowsImpl
+else if (builtin.os.tag.isDarwin())
+ DarwinImpl
+else if (builtin.os.tag == .linux)
+ LinuxImpl
+else if (builtin.os.tag == .freebsd)
+ FreebsdImpl
+else if (builtin.os.tag == .openbsd)
+ OpenbsdImpl
+else if (builtin.os.tag == .dragonfly)
+ DragonflyImpl
+else if (std.Thread.use_pthreads)
+ PosixImpl
else
- UnsupportedFutex;
+ UnsupportedImpl;
-const UnsupportedFutex = struct {
- fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
+/// We can't do @compileError() in the `Impl` switch statement above as its eagerly evaluated.
+/// So instead, we @compileError() on the methods themselves for platforms which don't support futex.
+const UnsupportedImpl = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
return unsupported(.{ ptr, expect, timeout });
}
- fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
- return unsupported(.{ ptr, num_waiters });
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ return unsupported(.{ ptr, max_waiters });
}
fn unsupported(unused: anytype) noreturn {
- @compileLog("Unsupported operating system", target.os.tag);
_ = unused;
- unreachable;
+ @compileError("Unsupported operating system " ++ @tagName(builtin.target.os.tag));
}
};
-const WindowsFutex = struct {
- const windows = std.os.windows;
+const SingleThreadedImpl = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
+ if (ptr.loadUnchecked() != expect) {
+ return;
+ }
+
+ // There are no threads to wake us up.
+ // So if we wait without a timeout we would never wake up.
+ const delay = timeout orelse {
+ unreachable; // deadlock detected
+ };
+
+ std.time.sleep(delay);
+ return error.Timeout;
+ }
+
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ // There are no other threads to possibly wake up
+ _ = ptr;
+ _ = max_waiters;
+ }
+};
- fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
- var timeout_value: windows.LARGE_INTEGER = undefined;
- var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
+// We use WaitOnAddress through NtDll instead of API-MS-Win-Core-Synch-l1-2-0.dll
+// as it's generally already a linked target and is autoloaded into all processes anyway.
+const WindowsImpl = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
+ var timeout_value: os.windows.LARGE_INTEGER = undefined;
+ var timeout_ptr: ?*const os.windows.LARGE_INTEGER = null;
// NTDLL functions work with time in units of 100 nanoseconds.
- // Positive values for timeouts are absolute time while negative is relative.
- if (timeout) |timeout_ns| {
+ // Positive values are absolute deadlines while negative values are relative durations.
+ if (timeout) |delay| {
+ timeout_value = @intCast(os.windows.LARGE_INTEGER, delay / 100);
+ timeout_value = -timeout_value;
timeout_ptr = &timeout_value;
- timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100);
}
- switch (windows.ntdll.RtlWaitOnAddress(
+ const rc = os.windows.ntdll.RtlWaitOnAddress(
@ptrCast(?*const anyopaque, ptr),
@ptrCast(?*const anyopaque, &expect),
@sizeOf(@TypeOf(expect)),
timeout_ptr,
- )) {
+ );
+
+ switch (rc) {
.SUCCESS => {},
- .TIMEOUT => return error.TimedOut,
+ .TIMEOUT => {
+ assert(timeout != null);
+ return error.Timeout;
+ },
else => unreachable,
}
}
- fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
const address = @ptrCast(?*const anyopaque, ptr);
- switch (num_waiters) {
- 1 => windows.ntdll.RtlWakeAddressSingle(address),
- else => windows.ntdll.RtlWakeAddressAll(address),
- }
- }
-};
+ assert(max_waiters != 0);
-const LinuxFutex = struct {
- const linux = std.os.linux;
-
- fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
- var ts: std.os.timespec = undefined;
- var ts_ptr: ?*std.os.timespec = null;
-
- // Futex timespec timeout is already in relative time.
- if (timeout) |timeout_ns| {
- ts_ptr = &ts;
- ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
- ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
- }
-
- switch (linux.getErrno(linux.futex_wait(
- @ptrCast(*const i32, ptr),
- linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
- @bitCast(i32, expect),
- ts_ptr,
- ))) {
- .SUCCESS => {}, // notified by `wake()`
- .INTR => {}, // spurious wakeup
- .AGAIN => {}, // ptr.* != expect
- .TIMEDOUT => return error.TimedOut,
- .INVAL => {}, // possibly timeout overflow
- .FAULT => unreachable,
- else => unreachable,
- }
- }
-
- fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
- switch (linux.getErrno(linux.futex_wake(
- @ptrCast(*const i32, ptr),
- linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE,
- std.math.cast(i32, num_waiters) catch std.math.maxInt(i32),
- ))) {
- .SUCCESS => {}, // successful wake up
- .INVAL => {}, // invalid futex_wait() on ptr done elsewhere
- .FAULT => {}, // pointer became invalid while doing the wake
- else => unreachable,
+ switch (max_waiters) {
+ 1 => os.windows.ntdll.RtlWakeAddressSingle(address),
+ else => os.windows.ntdll.RtlWakeAddressAll(address),
}
}
};
-const DarwinFutex = struct {
- const darwin = std.os.darwin;
-
- fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
+const DarwinImpl = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
// Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it:
// https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
//
@@ -183,58 +171,67 @@ const DarwinFutex = struct {
//
// ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout
// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
+ const supports_ulock_wait2 = builtin.target.os.version_range.semver.min.major >= 11;
+
var timeout_ns: u64 = 0;
- if (timeout) |timeout_value| {
- // This should be checked by the caller.
- assert(timeout_value != 0);
- timeout_ns = timeout_value;
+ if (timeout) |delay| {
+ assert(delay != 0); // handled by timedWait()
+ timeout_ns = delay;
}
- const addr = @ptrCast(*const anyopaque, ptr);
- const flags = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO;
+
// If we're using `__ulock_wait` and `timeout` is too big to fit inside a `u32` count of
// micro-seconds (around 70min), we'll request a shorter timeout. This is fine (users
// should handle spurious wakeups), but we need to remember that we did so, so that
- // we don't return `TimedOut` incorrectly. If that happens, we set this variable to
+ // we don't return `Timeout` incorrectly. If that happens, we set this variable to
// true so that we we know to ignore the ETIMEDOUT result.
var timeout_overflowed = false;
+
+ const addr = @ptrCast(*const anyopaque, ptr);
+ const flags = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO;
const status = blk: {
- if (target.os.version_range.semver.min.major >= 11) {
- break :blk darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
- } else {
- const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) catch overflow: {
- timeout_overflowed = true;
- break :overflow std.math.maxInt(u32);
- };
- break :blk darwin.__ulock_wait(flags, addr, expect, timeout_us);
+ if (supports_ulock_wait2) {
+ break :blk os.darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
}
+
+ const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) orelse overflow: {
+ timeout_overflowed = true;
+ break :overflow std.math.maxInt(u32);
+ };
+
+ break :blk os.darwin.__ulock_wait(flags, addr, expect, timeout_us);
};
if (status >= 0) return;
switch (@intToEnum(std.os.E, -status)) {
+ // Wait was interrupted by the OS or other spurious signalling.
.INTR => {},
- // Address of the futex is paged out. This is unlikely, but possible in theory, and
+ // Address of the futex was paged out. This is unlikely, but possible in theory, and
// pthread/libdispatch on darwin bother to handle it. In this case we'll return
// without waiting, but the caller should retry anyway.
.FAULT => {},
- .TIMEDOUT => if (!timeout_overflowed) return error.TimedOut,
+ // Only report Timeout if we didn't have to cap the timeout
+ .TIMEDOUT => {
+ assert(timeout != null);
+ if (!timeout_overflowed) return error.Timeout;
+ },
else => unreachable,
}
}
- fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
- var flags: u32 = darwin.UL_COMPARE_AND_WAIT | darwin.ULF_NO_ERRNO;
- if (num_waiters > 1) {
- flags |= darwin.ULF_WAKE_ALL;
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ var flags: u32 = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO;
+ if (max_waiters > 1) {
+ flags |= os.darwin.ULF_WAKE_ALL;
}
while (true) {
const addr = @ptrCast(*const anyopaque, ptr);
- const status = darwin.__ulock_wake(flags, addr, 0);
+ const status = os.darwin.__ulock_wake(flags, addr, 0);
if (status >= 0) return;
switch (@intToEnum(std.os.E, -status)) {
.INTR => continue, // spurious wake()
- .FAULT => continue, // address of the lock was paged out
+ .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
.NOENT => return, // nothing was woken up
.ALREADY => unreachable, // only for ULF_WAKE_THREAD
else => unreachable,
@@ -243,332 +240,780 @@ const DarwinFutex = struct {
}
};
-const PosixFutex = struct {
- fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{TimedOut}!void {
- const address = @ptrToInt(ptr);
- const bucket = Bucket.from(address);
- var waiter: List.Node = undefined;
+// https://man7.org/linux/man-pages/man2/futex.2.html
+const LinuxImpl = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
+ var ts: os.timespec = undefined;
+ if (timeout) |timeout_ns| {
+ ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
+ ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
+ }
- {
- assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+ const rc = os.linux.futex_wait(
+ @ptrCast(*const i32, &ptr.value),
+ os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAIT,
+ @bitCast(i32, expect),
+ if (timeout != null) &ts else null,
+ );
- if (ptr.load(.SeqCst) != expect) {
- return;
- }
+ switch (os.linux.getErrno(rc)) {
+ .SUCCESS => {}, // notified by `wake()`
+ .INTR => {}, // spurious wakeup
+ .AGAIN => {}, // ptr.* != expect
+ .TIMEDOUT => {
+ assert(timeout != null);
+ return error.Timeout;
+ },
+ .INVAL => {}, // possibly timeout overflow
+ .FAULT => unreachable, // ptr was invalid
+ else => unreachable,
+ }
+ }
- waiter.data = .{ .address = address };
- bucket.list.prepend(&waiter);
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ const rc = os.linux.futex_wake(
+ @ptrCast(*const i32, &ptr.value),
+ os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAKE,
+ std.math.cast(i32, max_waiters) orelse std.math.maxInt(i32),
+ );
+
+ switch (os.linux.getErrno(rc)) {
+ .SUCCESS => {}, // successful wake up
+ .INVAL => {}, // invalid futex_wait() on ptr done elsewhere
+ .FAULT => {}, // pointer became invalid while doing the wake
+ else => unreachable,
}
+ }
+};
- var timed_out = false;
- waiter.data.wait(timeout) catch {
- defer if (!timed_out) {
- waiter.data.wait(null) catch unreachable;
- };
+// https://www.freebsd.org/cgi/man.cgi?query=_umtx_op&sektion=2&n=1
+const FreebsdImpl = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
+ var tm_size: usize = 0;
+ var tm: os.freebsd._umtx_time = undefined;
+ var tm_ptr: ?*const os.freebsd._umtx_time = null;
- assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+ if (timeout) |timeout_ns| {
+ tm_ptr = &tm;
+ tm_size = @sizeOf(@TypeOf(tm));
- if (waiter.data.address == address) {
- timed_out = true;
- bucket.list.remove(&waiter);
- }
- };
+ tm._flags = 0; // use relative time not UMTX_ABSTIME
+ tm._clockid = os.CLOCK.MONOTONIC;
+ tm._timeout.tv_sec = @intCast(@TypeOf(tm._timeout.tv_sec), timeout_ns / std.time.ns_per_s);
+ tm._timeout.tv_nsec = @intCast(@TypeOf(tm._timeout.tv_nsec), timeout_ns % std.time.ns_per_s);
+ }
- waiter.data.deinit();
- if (timed_out) {
- return error.TimedOut;
+ const rc = os.freebsd._umtx_op(
+ @ptrToInt(&ptr.value),
+ @enumToInt(os.freebsd.UMTX_OP.WAIT_UINT_PRIVATE),
+ @as(c_ulong, expect),
+ tm_size,
+ @ptrToInt(tm_ptr),
+ );
+
+ switch (os.errno(rc)) {
+ .SUCCESS => {},
+ .FAULT => unreachable, // one of the args points to invalid memory
+ .INVAL => unreachable, // arguments should be correct
+ .TIMEDOUT => {
+ assert(timeout != null);
+ return error.Timeout;
+ },
+ .INTR => {}, // spurious wake
+ else => unreachable,
}
}
- fn wake(ptr: *const Atomic(u32), num_waiters: u32) void {
- const address = @ptrToInt(ptr);
- const bucket = Bucket.from(address);
- var can_notify = num_waiters;
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ const rc = os.freebsd._umtx_op(
+ @ptrToInt(&ptr.value),
+ @enumToInt(os.freebsd.UMTX_OP.WAKE_PRIVATE),
+ @as(c_ulong, max_waiters),
+ 0, // there is no timeout struct
+ 0, // there is no timeout struct pointer
+ );
- var notified = List{};
- defer while (notified.popFirst()) |waiter| {
- waiter.data.notify();
- };
+ switch (os.errno(rc)) {
+ .SUCCESS => {},
+ .FAULT => {}, // it's ok if the ptr doesn't point to valid memory
+ .INVAL => unreachable, // arguments should be correct
+ else => unreachable,
+ }
+ }
+};
- assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+// https://man.openbsd.org/futex.2
+const OpenbsdImpl = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
+ var ts: os.timespec = undefined;
+ if (timeout) |timeout_ns| {
+ ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
+ ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
+ }
- var waiters = bucket.list.first;
- while (waiters) |waiter| {
- assert(waiter.data.address != null);
- waiters = waiter.next;
+ const rc = os.openbsd.futex(
+ @ptrCast(*const volatile u32, &ptr.value),
+ os.openbsd.FUTEX_WAIT | os.openbsd.FUTEX_PRIVATE_FLAG,
+ @bitCast(c_int, expect),
+ if (timeout != null) &ts else null,
+ null, // FUTEX_WAIT takes no requeue address
+ );
+
+ switch (os.errno(rc)) {
+ .SUCCESS => {}, // woken up by wake
+ .NOSYS => unreachable, // the futex operation shouldn't be invalid
+ .FAULT => unreachable, // ptr was invalid
+ .AGAIN => {}, // ptr != expect
+ .INVAL => unreachable, // invalid timeout
+ .TIMEDOUT => {
+ assert(timeout != null);
+ return error.Timeout;
+ },
+ .INTR => {}, // spurious wake from signal
+ .CANCELED => {}, // spurious wake from signal with SA_RESTART
+ else => unreachable,
+ }
+ }
- if (waiter.data.address != address) continue;
- if (can_notify == 0) break;
- can_notify -= 1;
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ const rc = os.openbsd.futex(
+ @ptrCast(*const volatile u32, &ptr.value),
+ os.openbsd.FUTEX_WAKE | os.openbsd.FUTEX_PRIVATE_FLAG,
+ std.math.cast(c_int, max_waiters) orelse std.math.maxInt(c_int),
+ null, // FUTEX_WAKE takes no timeout ptr
+ null, // FUTEX_WAKE takes no requeue address
+ );
+
+ // returns number of threads woken up.
+ assert(rc >= 0);
+ }
+};
- bucket.list.remove(waiter);
- waiter.data.address = null;
- notified.prepend(waiter);
+// https://man.dragonflybsd.org/?command=umtx&section=2
+const DragonflyImpl = struct {
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
+ // Dragonfly uses a scheme where 0 timeout means wait until signaled or spurious wake.
+ // It's reporting of timeout's is also unrealiable so we use an external timing source (Timer) instead.
+ var timeout_us: c_int = 0;
+ var timeout_overflowed = false;
+ var sleep_timer: std.time.Timer = undefined;
+
+ if (timeout) |delay| {
+ assert(delay != 0); // handled by timedWait().
+ timeout_us = std.math.cast(c_int, delay / std.time.ns_per_us) orelse blk: {
+ timeout_overflowed = true;
+ break :blk std.math.maxInt(c_int);
+ };
+
+ // Only need to record the start time if we can provide somewhat accurate error.Timeout's
+ if (!timeout_overflowed) {
+ sleep_timer = std.time.Timer.start() catch unreachable;
+ }
+ }
+
+ const value = @bitCast(c_int, expect);
+ const addr = @ptrCast(*const volatile c_int, &ptr.value);
+ const rc = os.dragonfly.umtx_sleep(addr, value, timeout_us);
+
+ switch (os.errno(rc)) {
+ .SUCCESS => {},
+ .BUSY => {}, // ptr != expect
+ .AGAIN => { // maybe timed out, or paged out, or hit 2s kernel refresh
+ if (timeout) |timeout_ns| {
+ // Report error.Timeout only if we know the timeout duration has passed.
+ // If not, there's not much choice other than treating it as a spurious wake.
+ if (!timeout_overflowed and sleep_timer.read() >= timeout_ns) {
+ return error.Timeout;
+ }
+ }
+ },
+ .INTR => {}, // spurious wake
+ .INVAL => unreachable, // invalid timeout
+ else => unreachable,
}
}
- const Bucket = struct {
- mutex: std.c.pthread_mutex_t = .{},
- list: List = .{},
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ // A count of zero means wake all waiters.
+ assert(max_waiters != 0);
+ const to_wake = std.math.cast(c_int, max_waiters) orelse 0;
- var buckets = [_]Bucket{.{}} ** 64;
+ // https://man.dragonflybsd.org/?command=umtx&section=2
+ // > umtx_wakeup() will generally return 0 unless the address is bad.
+ // We are fine with the address being bad (e.g. for Semaphore.post() where Semaphore.wait() frees the Semaphore)
+ const addr = @ptrCast(*const volatile c_int, &ptr.value);
+ _ = os.dragonfly.umtx_wakeup(addr, to_wake);
+ }
+};
- fn from(address: usize) *Bucket {
- return &buckets[address % buckets.len];
+/// Modified version of linux's futex and Go's sema to implement userspace wait queues with pthread:
+/// https://code.woboq.org/linux/linux/kernel/futex.c.html
+/// https://go.dev/src/runtime/sema.go
+const PosixImpl = struct {
+ const Event = struct {
+ cond: std.c.pthread_cond_t,
+ mutex: std.c.pthread_mutex_t,
+ state: enum { empty, waiting, notified },
+
+ fn init(self: *Event) void {
+ // Use static init instead of pthread_cond/mutex_init() since this is generally faster.
+ self.cond = .{};
+ self.mutex = .{};
+ self.state = .empty;
}
- };
- const List = std.TailQueue(struct {
- address: ?usize,
- state: State = .empty,
- cond: std.c.pthread_cond_t = .{},
- mutex: std.c.pthread_mutex_t = .{},
-
- const Self = @This();
- const State = enum {
- empty,
- waiting,
- notified,
- };
+ fn deinit(self: *Event) void {
+ // Some platforms reportedly give EINVAL for statically initialized pthread types.
+ const rc = std.c.pthread_cond_destroy(&self.cond);
+ assert(rc == .SUCCESS or rc == .INVAL);
+
+ const rm = std.c.pthread_mutex_destroy(&self.mutex);
+ assert(rm == .SUCCESS or rm == .INVAL);
- fn deinit(self: *Self) void {
- _ = std.c.pthread_cond_destroy(&self.cond);
- _ = std.c.pthread_mutex_destroy(&self.mutex);
+ self.* = undefined;
}
- fn wait(self: *Self, timeout: ?u64) error{TimedOut}!void {
+ fn wait(self: *Event, timeout: ?u64) error{Timeout}!void {
assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
- switch (self.state) {
- .empty => self.state = .waiting,
- .waiting => unreachable,
- .notified => return,
+ // Early return if the event was already set.
+ if (self.state == .notified) {
+ return;
}
- var ts: std.os.timespec = undefined;
- var ts_ptr: ?*const std.os.timespec = null;
+ // Compute the absolute timeout if one was specified.
+ // POSIX requires that REALTIME is used by default for the pthread timedwait functions.
+ // This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere.
+ var ts: os.timespec = undefined;
if (timeout) |timeout_ns| {
- ts_ptr = &ts;
- std.os.clock_gettime(std.os.CLOCK.REALTIME, &ts) catch unreachable;
- ts.tv_sec += @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
+ os.clock_gettime(os.CLOCK.REALTIME, &ts) catch unreachable;
+ ts.tv_sec +|= @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
ts.tv_nsec += @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
+
if (ts.tv_nsec >= std.time.ns_per_s) {
- ts.tv_sec += 1;
+ ts.tv_sec +|= 1;
ts.tv_nsec -= std.time.ns_per_s;
}
}
- while (true) {
- switch (self.state) {
- .empty => unreachable,
- .waiting => {},
- .notified => return,
- }
+ // Start waiting on the event - there can be only one thread waiting.
+ assert(self.state == .empty);
+ self.state = .waiting;
- const ts_ref = ts_ptr orelse {
- assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == .SUCCESS);
- continue;
+ while (true) {
+ // Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout.
+ const rc = blk: {
+ if (timeout == null) break :blk std.c.pthread_cond_wait(&self.cond, &self.mutex);
+ break :blk std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
};
- const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, ts_ref);
+ // After waking up, check if the event was set.
+ if (self.state == .notified) {
+ return;
+ }
+
+ assert(self.state == .waiting);
switch (rc) {
.SUCCESS => {},
.TIMEDOUT => {
+ // If timed out, reset the event to avoid the set() thread doing an unnecessary signal().
self.state = .empty;
- return error.TimedOut;
+ return error.Timeout;
},
+ .INVAL => unreachable, // cond, mutex, and potentially ts should all be valid
+ .PERM => unreachable, // mutex is locked when cond_*wait() functions are called
else => unreachable,
}
}
}
- fn notify(self: *Self) void {
+ fn set(self: *Event) void {
assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
- switch (self.state) {
- .empty => self.state = .notified,
- .waiting => {
- self.state = .notified;
- assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS);
- },
- .notified => unreachable,
+ // Make sure that multiple calls to set() were not done on the same Event.
+ const old_state = self.state;
+ assert(old_state != .notified);
+
+ // Mark the event as set and wake up the waiting thread if there was one.
+ // This must be done while the mutex as the wait() thread could deallocate
+ // the condition variable once it observes the new state, potentially causing a UAF if done unlocked.
+ self.state = .notified;
+ if (old_state == .waiting) {
+ assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS);
}
}
- });
-};
+ };
-test "Futex - wait/wake" {
- var value = Atomic(u32).init(0);
- Futex.wait(&value, 1, null) catch unreachable;
+ const Treap = std.Treap(usize, std.math.order);
+ const Waiter = struct {
+ node: Treap.Node,
+ prev: ?*Waiter,
+ next: ?*Waiter,
+ tail: ?*Waiter,
+ is_queued: bool,
+ event: Event,
+ };
- const wait_noop_result = Futex.wait(&value, 0, 0);
- try testing.expectError(error.TimedOut, wait_noop_result);
+ // An unordered set of Waiters
+ const WaitList = struct {
+ top: ?*Waiter = null,
+ len: usize = 0,
- const wait_longer_result = Futex.wait(&value, 0, std.time.ns_per_ms);
- try testing.expectError(error.TimedOut, wait_longer_result);
+ fn push(self: *WaitList, waiter: *Waiter) void {
+ waiter.next = self.top;
+ self.top = waiter;
+ self.len += 1;
+ }
- Futex.wake(&value, 0);
- Futex.wake(&value, 1);
- Futex.wake(&value, std.math.maxInt(u32));
-}
+ fn pop(self: *WaitList) ?*Waiter {
+ const waiter = self.top orelse return null;
+ self.top = waiter.next;
+ self.len -= 1;
+ return waiter;
+ }
+ };
-test "Futex - Signal" {
- if (single_threaded) {
- return error.SkipZigTest;
- }
+ const WaitQueue = struct {
+ fn insert(treap: *Treap, address: usize, waiter: *Waiter) void {
+ // prepare the waiter to be inserted.
+ waiter.next = null;
+ waiter.is_queued = true;
+
+ // Find the wait queue entry associated with the address.
+ // If there isn't a wait queue on the address, this waiter creates the queue.
+ var entry = treap.getEntryFor(address);
+ const entry_node = entry.node orelse {
+ waiter.prev = null;
+ waiter.tail = waiter;
+ entry.set(&waiter.node);
+ return;
+ };
- const Paddle = struct {
- value: Atomic(u32) = Atomic(u32).init(0),
- current: u32 = 0,
+ // There's a wait queue on the address; get the queue head and tail.
+ const head = @fieldParentPtr(Waiter, "node", entry_node);
+ const tail = head.tail orelse unreachable;
- fn run(self: *@This(), hit_to: *@This()) !void {
- var iterations: usize = 4;
- while (iterations > 0) : (iterations -= 1) {
- var value: u32 = undefined;
- while (true) {
- value = self.value.load(.Acquire);
- if (value != self.current) break;
- Futex.wait(&self.value, self.current, null) catch unreachable;
- }
+ // Push the waiter to the tail by replacing it and linking to the previous tail.
+ head.tail = waiter;
+ tail.next = waiter;
+ waiter.prev = tail;
+ }
+
+ fn remove(treap: *Treap, address: usize, max_waiters: usize) WaitList {
+ // Find the wait queue associated with this address and get the head/tail if any.
+ var entry = treap.getEntryFor(address);
+ var queue_head = if (entry.node) |node| @fieldParentPtr(Waiter, "node", node) else null;
+ const queue_tail = if (queue_head) |head| head.tail else null;
+
+ // Once we're done updating the head, fix it's tail pointer and update the treap's queue head as well.
+ defer entry.set(blk: {
+ const new_head = queue_head orelse break :blk null;
+ new_head.tail = queue_tail;
+ break :blk &new_head.node;
+ });
+
+ var removed = WaitList{};
+ while (removed.len < max_waiters) {
+ // dequeue and collect waiters from their wait queue.
+ const waiter = queue_head orelse break;
+ queue_head = waiter.next;
+ removed.push(waiter);
+
+ // When dequeueing, we must mark is_queued as false.
+ // This ensures that a waiter which calls tryRemove() returns false.
+ assert(waiter.is_queued);
+ waiter.is_queued = false;
+ }
- try testing.expectEqual(value, self.current + 1);
- self.current = value;
+ return removed;
+ }
- _ = hit_to.value.fetchAdd(1, .Release);
- Futex.wake(&hit_to.value, 1);
+ fn tryRemove(treap: *Treap, address: usize, waiter: *Waiter) bool {
+ if (!waiter.is_queued) {
+ return false;
}
+
+ queue_remove: {
+ // Find the wait queue associated with the address.
+ var entry = blk: {
+ // A waiter without a previous link means it's the queue head that's in the treap so we can avoid lookup.
+ if (waiter.prev == null) {
+ assert(waiter.node.key == address);
+ break :blk treap.getEntryForExisting(&waiter.node);
+ }
+ break :blk treap.getEntryFor(address);
+ };
+
+ // The queue head and tail must exist if we're removing a queued waiter.
+ const head = @fieldParentPtr(Waiter, "node", entry.node orelse unreachable);
+ const tail = head.tail orelse unreachable;
+
+ // A waiter with a previous link is never the head of the queue.
+ if (waiter.prev) |prev| {
+ assert(waiter != head);
+ prev.next = waiter.next;
+
+ // A waiter with both a previous and next link is in the middle.
+ // We only need to update the surrounding waiter's links to remove it.
+ if (waiter.next) |next| {
+ assert(waiter != tail);
+ next.prev = waiter.prev;
+ break :queue_remove;
+ }
+
+ // A waiter with a previous but no next link means it's the tail of the queue.
+ // In that case, we need to update the head's tail reference.
+ assert(waiter == tail);
+ head.tail = waiter.prev;
+ break :queue_remove;
+ }
+
+ // A waiter with no previous link means it's the queue head of queue.
+ // We must replace (or remove) the head waiter reference in the treap.
+ assert(waiter == head);
+ entry.set(blk: {
+ const new_head = waiter.next orelse break :blk null;
+ new_head.tail = head.tail;
+ break :blk &new_head.node;
+ });
+ }
+
+ // Mark the waiter as successfully removed.
+ waiter.is_queued = false;
+ return true;
}
};
- var ping = Paddle{};
- var pong = Paddle{};
+ const Bucket = struct {
+ mutex: std.c.pthread_mutex_t align(std.atomic.cache_line) = .{},
+ pending: Atomic(usize) = Atomic(usize).init(0),
+ treap: Treap = .{},
- const t1 = try std.Thread.spawn(.{}, Paddle.run, .{ &ping, &pong });
- defer t1.join();
+ // Global array of buckets that addresses map to.
+ // Bucket array size is pretty much arbitrary here, but it must be a power of two for fibonacci hashing.
+ var buckets = [_]Bucket{.{}} ** @bitSizeOf(usize);
- const t2 = try std.Thread.spawn(.{}, Paddle.run, .{ &pong, &ping });
- defer t2.join();
+ // https://github.com/Amanieu/parking_lot/blob/1cf12744d097233316afa6c8b7d37389e4211756/core/src/parking_lot.rs#L343-L353
+ fn from(address: usize) *Bucket {
+ // The upper `@bitSizeOf(usize)` bits of the fibonacci golden ratio.
+ // Hashing this via (h * k) >> (64 - b) where k=golden-ration and b=bitsize-of-array
+ // evenly lays out h=hash values over the bit range even when the hash has poor entropy (identity-hash for pointers).
+ const max_multiplier_bits = @bitSizeOf(usize);
+ const fibonacci_multiplier = 0x9E3779B97F4A7C15 >> (64 - max_multiplier_bits);
- _ = ping.value.fetchAdd(1, .Release);
- Futex.wake(&ping.value, 1);
-}
+ const max_bucket_bits = @ctz(usize, buckets.len);
+ comptime assert(std.math.isPowerOfTwo(buckets.len));
-test "Futex - Broadcast" {
- if (single_threaded) {
- return error.SkipZigTest;
- }
+ const index = (address *% fibonacci_multiplier) >> (max_multiplier_bits - max_bucket_bits);
+ return &buckets[index];
+ }
+ };
- const Context = struct {
- threads: [4]std.Thread = undefined,
- broadcast: Atomic(u32) = Atomic(u32).init(0),
- notified: Atomic(usize) = Atomic(usize).init(0),
+ const Address = struct {
+ fn from(ptr: *const Atomic(u32)) usize {
+ // Get the alignment of the pointer.
+ const alignment = @alignOf(Atomic(u32));
+ comptime assert(std.math.isPowerOfTwo(alignment));
+
+ // Make sure the pointer is aligned,
+ // then cut off the zero bits from the alignment to get the unique address.
+ const addr = @ptrToInt(ptr);
+ assert(addr & (alignment - 1) == 0);
+ return addr >> @ctz(usize, alignment);
+ }
+ };
- const BROADCAST_EMPTY = 0;
- const BROADCAST_SENT = 1;
- const BROADCAST_RECEIVED = 2;
+ fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
+ const address = Address.from(ptr);
+ const bucket = Bucket.from(address);
- fn runSender(self: *@This()) !void {
- self.broadcast.store(BROADCAST_SENT, .Monotonic);
- Futex.wake(&self.broadcast, @intCast(u32, self.threads.len));
+ // Announce that there's a waiter in the bucket before checking the ptr/expect condition.
+ // If the announcement is reordered after the ptr check, the waiter could deadlock:
+ //
+ // - T1: checks ptr == expect which is true
+ // - T2: updates ptr to != expect
+ // - T2: does Futex.wake(), sees no pending waiters, exits
+ // - T1: bumps pending waiters (was reordered after the ptr == expect check)
+ // - T1: goes to sleep and misses both the ptr change and T2's wake up
+ //
+ // SeqCst as Acquire barrier to ensure the announcement happens before the ptr check below.
+ // SeqCst as shared modification order to form a happens-before edge with the fence(.SeqCst)+load() in wake().
+ var pending = bucket.pending.fetchAdd(1, .SeqCst);
+ assert(pending < std.math.maxInt(usize));
+
+ // If the wait gets cancelled, remove the pending count we previously added.
+ // This is done outside the mutex lock to keep the critical section short in case of contention.
+ var cancelled = false;
+ defer if (cancelled) {
+ pending = bucket.pending.fetchSub(1, .Monotonic);
+ assert(pending > 0);
+ };
- while (true) {
- const broadcast = self.broadcast.load(.Acquire);
- if (broadcast == BROADCAST_RECEIVED) break;
- try testing.expectEqual(broadcast, BROADCAST_SENT);
- Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
+ var waiter: Waiter = undefined;
+ {
+ assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
+ defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+
+ cancelled = ptr.load(.Monotonic) != expect;
+ if (cancelled) {
+ return;
}
+
+ waiter.event.init();
+ WaitQueue.insert(&bucket.treap, address, &waiter);
}
- fn runReceiver(self: *@This()) void {
- while (true) {
- const broadcast = self.broadcast.load(.Acquire);
- if (broadcast == BROADCAST_SENT) break;
- assert(broadcast == BROADCAST_EMPTY);
- Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
+ defer {
+ assert(!waiter.is_queued);
+ waiter.event.deinit();
+ }
+
+ waiter.event.wait(timeout) catch {
+ // If we fail to cancel after a timeout, it means a wake() thread dequeued us and will wake us up.
+ // We must wait until the event is set as that's a signal that the wake() thread wont access the waiter memory anymore.
+ // If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF.
+ defer if (!cancelled) waiter.event.wait(null) catch unreachable;
+
+ assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
+ defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+
+ cancelled = WaitQueue.tryRemove(&bucket.treap, address, &waiter);
+ if (cancelled) {
+ return error.Timeout;
}
+ };
+ }
- const notified = self.notified.fetchAdd(1, .Monotonic);
- if (notified + 1 == self.threads.len) {
- self.broadcast.store(BROADCAST_RECEIVED, .Release);
- Futex.wake(&self.broadcast, 1);
+ fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
+ const address = Address.from(ptr);
+ const bucket = Bucket.from(address);
+
+ // Quick check if there's even anything to wake up.
+ // The change to the ptr's value must happen before we check for pending waiters.
+ // If not, the wake() thread could miss a sleeping waiter and have it deadlock:
+ //
+ // - T2: p = has pending waiters (reordered before the ptr update)
+ // - T1: bump pending waiters
+ // - T1: if ptr == expected: sleep()
+ // - T2: update ptr != expected
+ // - T2: p is false from earlier so doesn't wake (T1 missed ptr update and T2 missed T1 sleeping)
+ //
+ // What we really want here is a Release load, but that doesn't exist under the C11 memory model.
+ // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing,
+ // but the RMW operation unconditionally marks the cache-line as modified for others causing unnecessary fetching/contention.
+ //
+ // Instead we opt to do a full-fence + load instead which avoids taking ownership of the cache-line.
+ // fence(SeqCst) effectively converts the ptr update to SeqCst and the pending load to SeqCst: creating a Store-Load barrier.
+ //
+ // The pending count increment in wait() must also now use SeqCst for the update + this pending load
+ // to be in the same modification order as our load isn't using Release/Acquire to guarantee it.
+ bucket.pending.fence(.SeqCst);
+ if (bucket.pending.load(.Monotonic) == 0) {
+ return;
+ }
+
+ // Keep a list of all the waiters notified and wake then up outside the mutex critical section.
+ var notified = WaitList{};
+ defer if (notified.len > 0) {
+ const pending = bucket.pending.fetchSub(notified.len, .Monotonic);
+ assert(pending >= notified.len);
+
+ while (notified.pop()) |waiter| {
+ assert(!waiter.is_queued);
+ waiter.event.set();
}
+ };
+
+ assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
+ defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+
+ // Another pending check again to avoid the WaitQueue lookup if not necessary.
+ if (bucket.pending.load(.Monotonic) > 0) {
+ notified = WaitQueue.remove(&bucket.treap, address, max_waiters);
}
- };
+ }
+};
+
+test "Futex - smoke test" {
+ var value = Atomic(u32).init(0);
- var ctx = Context{};
- for (ctx.threads) |*thread|
- thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx});
- defer for (ctx.threads) |thread|
- thread.join();
+ // Try waits with invalid values.
+ Futex.wait(&value, 0xdeadbeef);
+ Futex.timedWait(&value, 0xdeadbeef, 0) catch {};
- // Try to wait for the threads to start before running runSender().
- // NOTE: not actually needed for correctness.
- std.time.sleep(16 * std.time.ns_per_ms);
- try ctx.runSender();
+ // Try timeout waits.
+ try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, 0));
+ try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, std.time.ns_per_ms));
- const notified = ctx.notified.load(.Monotonic);
- try testing.expectEqual(notified, ctx.threads.len);
+ // Try wakes
+ Futex.wake(&value, 0);
+ Futex.wake(&value, 1);
+ Futex.wake(&value, std.math.maxInt(u32));
}
-test "Futex - Chain" {
- if (single_threaded) {
+test "Futex - signaling" {
+ // This test requires spawning threads
+ if (builtin.single_threaded) {
return error.SkipZigTest;
}
- const Signal = struct {
+ const num_threads = 4;
+ const num_iterations = 4;
+
+ const Paddle = struct {
value: Atomic(u32) = Atomic(u32).init(0),
+ current: u32 = 0,
- fn wait(self: *@This()) void {
- while (true) {
- const value = self.value.load(.Acquire);
- if (value == 1) break;
- assert(value == 0);
- Futex.wait(&self.value, 0, null) catch unreachable;
- }
+ fn hit(self: *@This()) void {
+ _ = self.value.fetchAdd(1, .Release);
+ Futex.wake(&self.value, 1);
}
- fn notify(self: *@This()) void {
- assert(self.value.load(.Unordered) == 0);
- self.value.store(1, .Release);
- Futex.wake(&self.value, 1);
+ fn run(self: *@This(), hit_to: *@This()) !void {
+ while (self.current < num_iterations) {
+ // Wait for the value to change from hit()
+ var new_value: u32 = undefined;
+ while (true) {
+ new_value = self.value.load(.Acquire);
+ if (new_value != self.current) break;
+ Futex.wait(&self.value, self.current);
+ }
+
+ // change the internal "current" value
+ try testing.expectEqual(new_value, self.current + 1);
+ self.current = new_value;
+
+ // hit the next paddle
+ hit_to.hit();
+ }
}
};
- const Context = struct {
- completed: Signal = .{},
- threads: [4]struct {
- thread: std.Thread,
- signal: Signal,
- } = undefined,
+ var paddles = [_]Paddle{.{}} ** num_threads;
+ var threads = [_]std.Thread{undefined} ** num_threads;
- fn run(self: *@This(), index: usize) void {
- const this_signal = &self.threads[index].signal;
+ // Create a circle of paddles which hit each other
+ for (threads) |*t, i| {
+ const paddle = &paddles[i];
+ const hit_to = &paddles[(i + 1) % paddles.len];
+ t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to });
+ }
+
+ // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations.
+ paddles[0].hit();
+ for (threads) |t| t.join();
+ for (paddles) |p| try testing.expectEqual(p.current, num_iterations);
+}
+
+test "Futex - broadcasting" {
+ // This test requires spawning threads
+ if (builtin.single_threaded) {
+ return error.SkipZigTest;
+ }
+
+ const num_threads = 4;
+ const num_iterations = 4;
+
+ const Barrier = struct {
+ count: Atomic(u32) = Atomic(u32).init(num_threads),
+ futex: Atomic(u32) = Atomic(u32).init(0),
+
+ fn wait(self: *@This()) !void {
+ // Decrement the counter.
+ // Release ensures stuff before this barrier.wait() happens before the last one.
+ const count = self.count.fetchSub(1, .Release);
+ try testing.expect(count <= num_threads);
+ try testing.expect(count > 0);
+
+ // First counter to reach zero wakes all other threads.
+ // Acquire for the last counter ensures stuff before previous barrier.wait()s happened before it.
+ // Release on futex update ensures stuff before all barrier.wait()'s happens before they all return.
+ if (count - 1 == 0) {
+ _ = self.count.load(.Acquire); // TODO: could be fence(Acquire) if not for TSAN
+ self.futex.store(1, .Release);
+ Futex.wake(&self.futex, num_threads - 1);
+ return;
+ }
- var next_signal = &self.completed;
- if (index + 1 < self.threads.len) {
- next_signal = &self.threads[index + 1].signal;
+ // Other threads wait until last counter wakes them up.
+ // Acquire on futex synchronizes with last barrier count to ensure stuff before all barrier.wait()'s happen before us.
+ while (self.futex.load(.Acquire) == 0) {
+ Futex.wait(&self.futex, 0);
}
+ }
+ };
+
+ const Broadcast = struct {
+ barriers: [num_iterations]Barrier = [_]Barrier{.{}} ** num_iterations,
+ threads: [num_threads]std.Thread = undefined,
- this_signal.wait();
- next_signal.notify();
+ fn run(self: *@This()) !void {
+ for (self.barriers) |*barrier| {
+ try barrier.wait();
+ }
}
};
- var ctx = Context{};
- for (ctx.threads) |*entry, index| {
- entry.signal = .{};
- entry.thread = try std.Thread.spawn(.{}, Context.run, .{ &ctx, index });
+ var broadcast = Broadcast{};
+ for (broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast});
+ for (broadcast.threads) |t| t.join();
+}
+
+/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout.
+///
+/// Futex's timedWait() api uses a relative duration which suffers from over-waiting
+/// when used in a loop which is often required due to the possibility of spurious wakeups.
+///
+/// Deadline instead converts the relative timeout to an absolute one so that multiple calls
+/// to Futex timedWait() can block for and report more accurate error.Timeouts.
+pub const Deadline = struct {
+ timeout: ?u64,
+ started: std.time.Timer,
+
+ /// Create the deadline to expire after the given amount of time in nanoseconds passes.
+ /// Pass in `null` to have the deadline call `Futex.wait()` and never expire.
+ pub fn init(expires_in_ns: ?u64) Deadline {
+ var deadline: Deadline = undefined;
+ deadline.timeout = expires_in_ns;
+
+ // std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout.
+ if (deadline.timeout != null) {
+ deadline.started = std.time.Timer.start() catch unreachable;
+ }
+
+ return deadline;
+ }
+
+ /// Wait until either:
+ /// - the `ptr`'s value changes from `expect`.
+ /// - `Futex.wake()` is called on the `ptr`.
+ /// - A spurious wake occurs.
+ /// - The deadline expires; In which case `error.Timeout` is returned.
+ pub fn wait(self: *Deadline, ptr: *const Atomic(u32), expect: u32) error{Timeout}!void {
+ @setCold(true);
+
+ // Check if we actually have a timeout to wait until.
+ // If not just wait "forever".
+ const timeout_ns = self.timeout orelse {
+ return Futex.wait(ptr, expect);
+ };
+
+ // Get how much time has passed since we started waiting
+ // then subtract that from the init() timeout to get how much longer to wait.
+ // Use overflow to detect when we've been waiting longer than the init() timeout.
+ const elapsed_ns = self.started.read();
+ const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0;
+ return Futex.timedWait(ptr, expect, until_timeout_ns);
}
+};
- ctx.threads[0].signal.notify();
- ctx.completed.wait();
+test "Futex - Deadline" {
+ var deadline = Deadline.init(100 * std.time.ns_per_ms);
+ var futex_word = Atomic(u32).init(0);
- for (ctx.threads) |entry| {
- entry.thread.join();
+ while (true) {
+ deadline.wait(&futex_word, 0) catch break;
}
}