From 70931dbdea96d92feb60406c827e39e566317863 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 22 Nov 2023 18:49:18 -0700 Subject: rework std.atomic * move std.atomic.Atomic to std.atomic.Value * fix incorrect argument order passed to testing.expectEqual * make the functions be a thin wrapper over the atomic builtins and stick to the naming conventions. * remove pointless functions loadUnchecked and storeUnchecked. Instead, name the field `raw` instead of `value` (which is redundant with the type name). * simplify the tests by not passing every possible combination. Many cases were iterating over every possible combinations but then not even using the for loop element value! * remove the redundant compile errors which are already implemented by the language itself. * remove dead x86 inline assembly. this should be implemented in the language if at all. --- lib/std/Thread/Condition.zig | 13 ++++---- lib/std/Thread/Futex.zig | 74 +++++++++++++++++++++---------------------- lib/std/Thread/Mutex.zig | 27 ++++++---------- lib/std/Thread/ResetEvent.zig | 7 ++-- lib/std/Thread/RwLock.zig | 2 +- lib/std/Thread/WaitGroup.zig | 3 +- 6 files changed, 57 insertions(+), 69 deletions(-) (limited to 'lib/std/Thread') diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig index 898fc14520..83e5134538 100644 --- a/lib/std/Thread/Condition.zig +++ b/lib/std/Thread/Condition.zig @@ -50,7 +50,6 @@ const Mutex = std.Thread.Mutex; const os = std.os; const assert = std.debug.assert; const testing = std.testing; -const Atomic = std.atomic.Atomic; const Futex = std.Thread.Futex; impl: Impl = .{}, @@ -193,8 +192,8 @@ const WindowsImpl = struct { }; const FutexImpl = struct { - state: Atomic(u32) = Atomic(u32).init(0), - epoch: Atomic(u32) = Atomic(u32).init(0), + state: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + epoch: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), const one_waiter = 1; const waiter_mask = 0xffff; @@ -232,12 +231,12 @@ const FutexImpl = struct { // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. while (state & signal_mask != 0) { const new_state = state - one_waiter - one_signal; - state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return; + state = self.state.cmpxchgWeak(state, new_state, .Acquire, .Monotonic) orelse return; } // Remove the waiter we added and officially return timed out. const new_state = state - one_waiter; - state = self.state.tryCompareAndSwap(state, new_state, .Monotonic, .Monotonic) orelse return err; + state = self.state.cmpxchgWeak(state, new_state, .Monotonic, .Monotonic) orelse return err; } }, }; @@ -249,7 +248,7 @@ const FutexImpl = struct { // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. while (state & signal_mask != 0) { const new_state = state - one_waiter - one_signal; - state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return; + state = self.state.cmpxchgWeak(state, new_state, .Acquire, .Monotonic) orelse return; } } } @@ -276,7 +275,7 @@ const FutexImpl = struct { // Reserve the amount of waiters to wake by incrementing the signals count. // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads. const new_state = state + (one_signal * to_wake); - state = self.state.tryCompareAndSwap(state, new_state, .Release, .Monotonic) orelse { + state = self.state.cmpxchgWeak(state, new_state, .Release, .Monotonic) orelse { // Wake up the waiting threads we reserved above by changing the epoch value. // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it. // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption. diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 97d55cf71f..bf2b318965 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -10,7 +10,7 @@ const Futex = @This(); const os = std.os; const assert = std.debug.assert; const testing = std.testing; -const Atomic = std.atomic.Atomic; +const atomic = std.atomic; /// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either: /// - The value at `ptr` is no longer equal to `expect`. @@ -19,7 +19,7 @@ const Atomic = std.atomic.Atomic; /// /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically /// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`. -pub fn wait(ptr: *const Atomic(u32), expect: u32) void { +pub fn wait(ptr: *const atomic.Value(u32), expect: u32) void { @setCold(true); Impl.wait(ptr, expect, null) catch |err| switch (err) { @@ -35,7 +35,7 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32) void { /// /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically /// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`. -pub fn timedWait(ptr: *const Atomic(u32), expect: u32, timeout_ns: u64) error{Timeout}!void { +pub fn timedWait(ptr: *const atomic.Value(u32), expect: u32, timeout_ns: u64) error{Timeout}!void { @setCold(true); // Avoid calling into the OS for no-op timeouts. @@ -48,7 +48,7 @@ pub fn timedWait(ptr: *const Atomic(u32), expect: u32, timeout_ns: u64) error{Ti } /// Unblocks at most `max_waiters` callers blocked in a `wait()` call on `ptr`. -pub fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { +pub fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { @setCold(true); // Avoid calling into the OS if there's nothing to wake up. @@ -83,11 +83,11 @@ else /// We can't do @compileError() in the `Impl` switch statement above as its eagerly evaluated. /// So instead, we @compileError() on the methods themselves for platforms which don't support futex. const UnsupportedImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { return unsupported(.{ ptr, expect, timeout }); } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { return unsupported(.{ ptr, max_waiters }); } @@ -98,8 +98,8 @@ const UnsupportedImpl = struct { }; const SingleThreadedImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { - if (ptr.loadUnchecked() != expect) { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + if (ptr.raw != expect) { return; } @@ -113,7 +113,7 @@ const SingleThreadedImpl = struct { return error.Timeout; } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { // There are no other threads to possibly wake up _ = ptr; _ = max_waiters; @@ -123,7 +123,7 @@ const SingleThreadedImpl = struct { // We use WaitOnAddress through NtDll instead of API-MS-Win-Core-Synch-l1-2-0.dll // as it's generally already a linked target and is autoloaded into all processes anyway. const WindowsImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { var timeout_value: os.windows.LARGE_INTEGER = undefined; var timeout_ptr: ?*const os.windows.LARGE_INTEGER = null; @@ -152,7 +152,7 @@ const WindowsImpl = struct { } } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { const address: ?*const anyopaque = ptr; assert(max_waiters != 0); @@ -164,7 +164,7 @@ const WindowsImpl = struct { }; const DarwinImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { // Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it: // https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6 // @@ -220,7 +220,7 @@ const DarwinImpl = struct { } } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { var flags: u32 = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO; if (max_waiters > 1) { flags |= os.darwin.ULF_WAKE_ALL; @@ -244,7 +244,7 @@ const DarwinImpl = struct { // https://man7.org/linux/man-pages/man2/futex.2.html const LinuxImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { var ts: os.timespec = undefined; if (timeout) |timeout_ns| { ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s)); @@ -252,7 +252,7 @@ const LinuxImpl = struct { } const rc = os.linux.futex_wait( - @as(*const i32, @ptrCast(&ptr.value)), + @as(*const i32, @ptrCast(&ptr.raw)), os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAIT, @as(i32, @bitCast(expect)), if (timeout != null) &ts else null, @@ -272,9 +272,9 @@ const LinuxImpl = struct { } } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { const rc = os.linux.futex_wake( - @as(*const i32, @ptrCast(&ptr.value)), + @as(*const i32, @ptrCast(&ptr.raw)), os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAKE, std.math.cast(i32, max_waiters) orelse std.math.maxInt(i32), ); @@ -290,7 +290,7 @@ const LinuxImpl = struct { // https://www.freebsd.org/cgi/man.cgi?query=_umtx_op&sektion=2&n=1 const FreebsdImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { var tm_size: usize = 0; var tm: os.freebsd._umtx_time = undefined; var tm_ptr: ?*const os.freebsd._umtx_time = null; @@ -326,7 +326,7 @@ const FreebsdImpl = struct { } } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { const rc = os.freebsd._umtx_op( @intFromPtr(&ptr.value), @intFromEnum(os.freebsd.UMTX_OP.WAKE_PRIVATE), @@ -346,7 +346,7 @@ const FreebsdImpl = struct { // https://man.openbsd.org/futex.2 const OpenbsdImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { var ts: os.timespec = undefined; if (timeout) |timeout_ns| { ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s)); @@ -377,7 +377,7 @@ const OpenbsdImpl = struct { } } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { const rc = os.openbsd.futex( @as(*const volatile u32, @ptrCast(&ptr.value)), os.openbsd.FUTEX_WAKE | os.openbsd.FUTEX_PRIVATE_FLAG, @@ -393,7 +393,7 @@ const OpenbsdImpl = struct { // https://man.dragonflybsd.org/?command=umtx§ion=2 const DragonflyImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { // Dragonfly uses a scheme where 0 timeout means wait until signaled or spurious wake. // It's reporting of timeout's is also unrealiable so we use an external timing source (Timer) instead. var timeout_us: c_int = 0; @@ -435,7 +435,7 @@ const DragonflyImpl = struct { } } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { // A count of zero means wake all waiters. assert(max_waiters != 0); const to_wake = std.math.cast(c_int, max_waiters) orelse 0; @@ -449,7 +449,7 @@ const DragonflyImpl = struct { }; const WasmImpl = struct { - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { if (!comptime std.Target.wasm.featureSetHas(builtin.target.cpu.features, .atomics)) { @compileError("WASI target missing cpu feature 'atomics'"); } @@ -473,7 +473,7 @@ const WasmImpl = struct { } } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { if (!comptime std.Target.wasm.featureSetHas(builtin.target.cpu.features, .atomics)) { @compileError("WASI target missing cpu feature 'atomics'"); } @@ -732,8 +732,8 @@ const PosixImpl = struct { }; const Bucket = struct { - mutex: std.c.pthread_mutex_t align(std.atomic.cache_line) = .{}, - pending: Atomic(usize) = Atomic(usize).init(0), + mutex: std.c.pthread_mutex_t align(atomic.cache_line) = .{}, + pending: atomic.Value(usize) = atomic.Value(usize).init(0), treap: Treap = .{}, // Global array of buckets that addresses map to. @@ -757,9 +757,9 @@ const PosixImpl = struct { }; const Address = struct { - fn from(ptr: *const Atomic(u32)) usize { + fn from(ptr: *const atomic.Value(u32)) usize { // Get the alignment of the pointer. - const alignment = @alignOf(Atomic(u32)); + const alignment = @alignOf(atomic.Value(u32)); comptime assert(std.math.isPowerOfTwo(alignment)); // Make sure the pointer is aligned, @@ -770,7 +770,7 @@ const PosixImpl = struct { } }; - fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { + fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { const address = Address.from(ptr); const bucket = Bucket.from(address); @@ -831,7 +831,7 @@ const PosixImpl = struct { }; } - fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { + fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { const address = Address.from(ptr); const bucket = Bucket.from(address); @@ -882,7 +882,7 @@ const PosixImpl = struct { }; test "Futex - smoke test" { - var value = Atomic(u32).init(0); + var value = atomic.Value(u32).init(0); // Try waits with invalid values. Futex.wait(&value, 0xdeadbeef); @@ -908,7 +908,7 @@ test "Futex - signaling" { const num_iterations = 4; const Paddle = struct { - value: Atomic(u32) = Atomic(u32).init(0), + value: atomic.Value(u32) = atomic.Value(u32).init(0), current: u32 = 0, fn hit(self: *@This()) void { @@ -962,8 +962,8 @@ test "Futex - broadcasting" { const num_iterations = 4; const Barrier = struct { - count: Atomic(u32) = Atomic(u32).init(num_threads), - futex: Atomic(u32) = Atomic(u32).init(0), + count: atomic.Value(u32) = atomic.Value(u32).init(num_threads), + futex: atomic.Value(u32) = atomic.Value(u32).init(0), fn wait(self: *@This()) !void { // Decrement the counter. @@ -1036,7 +1036,7 @@ pub const Deadline = struct { /// - `Futex.wake()` is called on the `ptr`. /// - A spurious wake occurs. /// - The deadline expires; In which case `error.Timeout` is returned. - pub fn wait(self: *Deadline, ptr: *const Atomic(u32), expect: u32) error{Timeout}!void { + pub fn wait(self: *Deadline, ptr: *const atomic.Value(u32), expect: u32) error{Timeout}!void { @setCold(true); // Check if we actually have a timeout to wait until. @@ -1056,7 +1056,7 @@ pub const Deadline = struct { test "Futex - Deadline" { var deadline = Deadline.init(100 * std.time.ns_per_ms); - var futex_word = Atomic(u32).init(0); + var futex_word = atomic.Value(u32).init(0); while (true) { deadline.wait(&futex_word, 0) catch break; diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig index 0f618516b5..a9024e6c5d 100644 --- a/lib/std/Thread/Mutex.zig +++ b/lib/std/Thread/Mutex.zig @@ -26,7 +26,6 @@ const Mutex = @This(); const os = std.os; const assert = std.debug.assert; const testing = std.testing; -const Atomic = std.atomic.Atomic; const Thread = std.Thread; const Futex = Thread.Futex; @@ -67,7 +66,7 @@ else FutexImpl; const DebugImpl = struct { - locking_thread: Atomic(Thread.Id) = Atomic(Thread.Id).init(0), // 0 means it's not locked. + locking_thread: std.atomic.Value(Thread.Id) = std.atomic.Value(Thread.Id).init(0), // 0 means it's not locked. impl: ReleaseImpl = .{}, inline fn tryLock(self: *@This()) bool { @@ -151,37 +150,29 @@ const DarwinImpl = struct { }; const FutexImpl = struct { - state: Atomic(u32) = Atomic(u32).init(unlocked), + state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked), - const unlocked = 0b00; - const locked = 0b01; - const contended = 0b11; // must contain the `locked` bit for x86 optimization below - - fn tryLock(self: *@This()) bool { - // Lock with compareAndSwap instead of tryCompareAndSwap to avoid reporting spurious CAS failure. - return self.lockFast("compareAndSwap"); - } + const unlocked: u32 = 0b00; + const locked: u32 = 0b01; + const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below fn lock(self: *@This()) void { - // Lock with tryCompareAndSwap instead of compareAndSwap due to being more inline-able on LL/SC archs like ARM. - if (!self.lockFast("tryCompareAndSwap")) { + if (!self.tryLock()) self.lockSlow(); - } } - inline fn lockFast(self: *@This(), comptime cas_fn_name: []const u8) bool { + fn tryLock(self: *@This()) bool { // On x86, use `lock bts` instead of `lock cmpxchg` as: // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048 // - `lock bts` is smaller instruction-wise which makes it better for inlining if (comptime builtin.target.cpu.arch.isX86()) { - const locked_bit = @ctz(@as(u32, locked)); + const locked_bit = @ctz(locked); return self.state.bitSet(locked_bit, .Acquire) == 0; } // Acquire barrier ensures grabbing the lock happens before the critical section // and that the previous lock holder's critical section happens before we grab the lock. - const casFn = @field(@TypeOf(self.state), cas_fn_name); - return casFn(&self.state, unlocked, locked, .Acquire, .Monotonic) == null; + return self.state.cmpxchgWeak(unlocked, locked, .Acquire, .Monotonic) == null; } fn lockSlow(self: *@This()) void { diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig index 42cf74fd42..cd74f337fb 100644 --- a/lib/std/Thread/ResetEvent.zig +++ b/lib/std/Thread/ResetEvent.zig @@ -9,7 +9,6 @@ const ResetEvent = @This(); const os = std.os; const assert = std.debug.assert; const testing = std.testing; -const Atomic = std.atomic.Atomic; const Futex = std.Thread.Futex; impl: Impl = .{}, @@ -89,7 +88,7 @@ const SingleThreadedImpl = struct { }; const FutexImpl = struct { - state: Atomic(u32) = Atomic(u32).init(unset), + state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unset), const unset = 0; const waiting = 1; @@ -115,7 +114,7 @@ const FutexImpl = struct { // We avoid using any strict barriers until the end when we know the ResetEvent is set. var state = self.state.load(.Monotonic); if (state == unset) { - state = self.state.compareAndSwap(state, waiting, .Monotonic, .Monotonic) orelse waiting; + state = self.state.cmpxchgStrong(state, waiting, .Monotonic, .Monotonic) orelse waiting; } // Wait until the ResetEvent is set since the state is waiting. @@ -252,7 +251,7 @@ test "ResetEvent - broadcast" { const num_threads = 10; const Barrier = struct { event: ResetEvent = .{}, - counter: Atomic(usize) = Atomic(usize).init(num_threads), + counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(num_threads), fn wait(self: *@This()) void { if (self.counter.fetchSub(1, .AcqRel) == 1) { diff --git a/lib/std/Thread/RwLock.zig b/lib/std/Thread/RwLock.zig index e77db10abb..a05d68df88 100644 --- a/lib/std/Thread/RwLock.zig +++ b/lib/std/Thread/RwLock.zig @@ -307,7 +307,7 @@ test "RwLock - concurrent access" { rwl: RwLock = .{}, writes: usize = 0, - reads: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0), + reads: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), term1: usize = 0, term2: usize = 0, diff --git a/lib/std/Thread/WaitGroup.zig b/lib/std/Thread/WaitGroup.zig index f2274db86a..a6a82a9492 100644 --- a/lib/std/Thread/WaitGroup.zig +++ b/lib/std/Thread/WaitGroup.zig @@ -1,12 +1,11 @@ const std = @import("std"); -const Atomic = std.atomic.Atomic; const assert = std.debug.assert; const WaitGroup = @This(); const is_waiting: usize = 1 << 0; const one_pending: usize = 1 << 1; -state: Atomic(usize) = Atomic(usize).init(0), +state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), event: std.Thread.ResetEvent = .{}, pub fn start(self: *WaitGroup) void { -- cgit v1.2.3