diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2021-01-14 20:41:37 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2021-01-14 20:41:37 -0700 |
| commit | a9667b5a859a589056f23df2b74b91fede0bbbfa (patch) | |
| tree | 0efb150c8b3357b61f2dc11b0018a1038fe6d354 /lib/std/Thread | |
| parent | 2b0e3ee228e01473cf880f719db9bde5b8f34d25 (diff) | |
| download | zig-a9667b5a859a589056f23df2b74b91fede0bbbfa.tar.gz zig-a9667b5a859a589056f23df2b74b91fede0bbbfa.zip | |
organize std lib concurrency primitives and add RwLock
* move concurrency primitives that always operate on kernel threads to
the std.Thread namespace
* remove std.SpinLock. Nobody should use this in a non-freestanding
environment; the other primitives are always preferable. In
freestanding, it will be necessary to put custom spin logic in there,
so there are no use cases for a std lib version.
* move some std lib files to the top level fields convention
* add std.Thread.spinLoopHint
* add std.Thread.Condition
* add std.Thread.Semaphore
* new implementation of std.Thread.Mutex for Windows and non-pthreads Linux
* add std.Thread.RwLock
Implementations provided by @kprotty
Diffstat (limited to 'lib/std/Thread')
| -rw-r--r-- | lib/std/Thread/AutoResetEvent.zig | 228 | ||||
| -rw-r--r-- | lib/std/Thread/Condition.zig | 182 | ||||
| -rw-r--r-- | lib/std/Thread/Mutex.zig | 303 | ||||
| -rw-r--r-- | lib/std/Thread/ResetEvent.zig | 297 | ||||
| -rw-r--r-- | lib/std/Thread/RwLock.zig | 308 | ||||
| -rw-r--r-- | lib/std/Thread/Semaphore.zig | 39 | ||||
| -rw-r--r-- | lib/std/Thread/StaticResetEvent.zig | 396 |
7 files changed, 1753 insertions, 0 deletions
diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig new file mode 100644 index 0000000000..8b8b5658bf --- /dev/null +++ b/lib/std/Thread/AutoResetEvent.zig @@ -0,0 +1,228 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`. +//! Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like). +//! +//! AutoResetEvent has 3 possible states: +//! - UNSET: the AutoResetEvent is currently unset +//! - SET: the AutoResetEvent was notified before a wait() was called +//! - <StaticResetEvent pointer>: there is an active waiter waiting for a notification. +//! +//! When attempting to wait: +//! if the event is unset, it registers a ResetEvent pointer to be notified when the event is set +//! if the event is already set, then it consumes the notification and resets the event. +//! +//! When attempting to notify: +//! if the event is unset, then we set the event +//! if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent +//! +//! This ensures that the event is automatically reset after a wait() has been issued +//! and avoids the race condition when using StaticResetEvent in the following scenario: +//! thread 1 | thread 2 +//! StaticResetEvent.wait() | +//! | StaticResetEvent.set() +//! | StaticResetEvent.set() +//! StaticResetEvent.reset() | +//! StaticResetEvent.wait() | (missed the second .set() notification above) + +state: usize = UNSET, + +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const testing = std.testing; +const assert = std.debug.assert; +const StaticResetEvent = std.Thread.StaticResetEvent; +const AutoResetEvent = @This(); + +const UNSET = 0; +const SET = 1; + +/// the minimum alignment for the `*StaticResetEvent` created by wait*() +const event_align = std.math.max(@alignOf(StaticResetEvent), 2); + +pub fn wait(self: *AutoResetEvent) void { + self.waitFor(null) catch unreachable; +} + +pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void { + return self.waitFor(timeout); +} + +fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { + // lazily initialized StaticResetEvent + var reset_event: StaticResetEvent align(event_align) = undefined; + var has_reset_event = false; + + var state = @atomicLoad(usize, &self.state, .SeqCst); + while (true) { + // consume a notification if there is any + if (state == SET) { + @atomicStore(usize, &self.state, UNSET, .SeqCst); + return; + } + + // check if theres currently a pending ResetEvent pointer already registered + if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent + } + + // lazily initialize the ResetEvent if it hasn't been already + if (!has_reset_event) { + has_reset_event = true; + reset_event = .{}; + } + + // Since the AutoResetEvent currently isnt set, + // try to register our ResetEvent on it to wait + // for a set() call from another thread. + if (@cmpxchgWeak( + usize, + &self.state, + UNSET, + @ptrToInt(&reset_event), + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + // if no timeout was specified, then just wait forever + const timeout_ns = timeout orelse { + reset_event.wait(); + return; + }; + + // wait with a timeout and return if signalled via set() + switch (reset_event.timedWait(timeout_ns)) { + .event_set => return, + .timed_out => {}, + } + + // If we timed out, we need to transition the AutoResetEvent back to UNSET. + // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. + state = @cmpxchgStrong( + usize, + &self.state, + @ptrToInt(&reset_event), + UNSET, + .SeqCst, + .SeqCst, + ) orelse return error.TimedOut; + + // We didn't manage to unregister ourselves from the state. + if (state == SET) { + unreachable; // AutoResetEvent notified without waking up the waiting thread + } else if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out + } + + // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up. + // We need to wait for it to wake up our ResetEvent before we can return and invalidate it. + // We don't return error.TimedOut here as it technically notified us while we were "timing out". + reset_event.wait(); + return; + } +} + +pub fn set(self: *AutoResetEvent) void { + var state = @atomicLoad(usize, &self.state, .SeqCst); + while (true) { + // If the AutoResetEvent is already set, there is nothing else left to do + if (state == SET) { + return; + } + + // If the AutoResetEvent isn't set, + // then try to leave a notification for the wait() thread that we set() it. + if (state == UNSET) { + state = @cmpxchgWeak( + usize, + &self.state, + UNSET, + SET, + .SeqCst, + .SeqCst, + ) orelse return; + continue; + } + + // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting. + // Try to acquire ownership of it so that we can wake it up. + // This also resets the AutoResetEvent so that there is no race condition as defined above. + if (@cmpxchgWeak( + usize, + &self.state, + state, + UNSET, + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state); + reset_event.set(); + return; + } +} + +test "basic usage" { + // test local code paths + { + var event = AutoResetEvent{}; + testing.expectError(error.TimedOut, event.timedWait(1)); + event.set(); + event.wait(); + } + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + value: u128 = 0, + in: AutoResetEvent = AutoResetEvent{}, + out: AutoResetEvent = AutoResetEvent{}, + + const Self = @This(); + + fn sender(self: *Self) void { + testing.expect(self.value == 0); + self.value = 1; + self.out.set(); + + self.in.wait(); + testing.expect(self.value == 2); + self.value = 3; + self.out.set(); + + self.in.wait(); + testing.expect(self.value == 4); + } + + fn receiver(self: *Self) void { + self.out.wait(); + testing.expect(self.value == 1); + self.value = 2; + self.in.set(); + + self.out.wait(); + testing.expect(self.value == 3); + self.value = 4; + self.in.set(); + } + }; + + var context = Context{}; + const send_thread = try std.Thread.spawn(&context, Context.sender); + const recv_thread = try std.Thread.spawn(&context, Context.receiver); + + send_thread.wait(); + recv_thread.wait(); +} diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig new file mode 100644 index 0000000000..2379d264d1 --- /dev/null +++ b/lib/std/Thread/Condition.zig @@ -0,0 +1,182 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A condition provides a way for a kernel thread to block until it is signaled +//! to wake up. Spurious wakeups are possible. +//! This API supports static initialization and does not require deinitialization. + +impl: Impl, + +const std = @import("../std.zig"); +const Condition = @This(); +const windows = std.os.windows; +const linux = std.os.linux; +const Mutex = std.Thread.Mutex; +const assert = std.debug.assert; + +const Impl = if (std.builtin.single_threaded) + SingleThreadedCondition +else if (std.Target.current.os.tag == .windows) + WindowsCondition +else if (std.Thread.use_pthreads) + PthreadCondition +else + AtomicCondition; + +pub const SingleThreadedCondition = struct { + pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void { + unreachable; // deadlock detected + } + + pub fn signal(cond: *SingleThreadedCondition) void {} + + pub fn broadcast(cond: *SingleThreadedCondition) void {} +}; + +pub const WindowsCondition = struct { + cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT, + + pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void { + const rc = windows.SleepConditionVariableSRW( + &cond.cond, + &mutex.srwlock, + windows.INFINITE, + @as(windows.ULONG, 0), + ); + assert(rc != windows.FALSE); + } + + pub fn signal(cond: *WindowsCondition) void { + windows.WakeConditionVariable(&cond.cond); + } + + pub fn broadcast(cond: *WindowsCondition) void { + windows.WakeAllConditionVariable(&cond.cond); + } +}; + +pub const PthreadCondition = struct { + cond: std.c.pthread_cond_t = .{}, + + pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void { + const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.mutex); + assert(rc == 0); + } + + pub fn signal(cond: *PthreadCondition) void { + const rc = std.c.pthread_cond_signal(&cond.cond); + assert(rc == 0); + } + + pub fn broadcast(cond: *PthreadCondition) void { + const rc = std.c.pthread_cond_broadcast(&cond.cond); + assert(rc == 0); + } +}; + +pub const AtomicCondition = struct { + pending: bool = false, + queue_mutex: Mutex = .{}, + queue_list: QueueList = .{}, + + pub const QueueList = std.SinglyLinkedList(QueueItem); + + pub const QueueItem = struct { + futex: i32 = 0, + + fn wait(cond: *@This()) void { + while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) { + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wait( + &cond.futex, + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT, + 0, + null, + ))) { + 0 => {}, + std.os.EINTR => {}, + std.os.EAGAIN => {}, + else => unreachable, + } + }, + else => spinLoopHint(), + } + } + } + + fn notify(cond: *@This()) void { + @atomicStore(i32, &cond.futex, 1, .Release); + + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wake( + &cond.futex, + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE, + 1, + ))) { + 0 => {}, + std.os.EFAULT => {}, + else => unreachable, + } + }, + else => {}, + } + } + }; + + pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void { + var waiter = QueueList.Node{ .data = .{} }; + + { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + cond.queue_list.prepend(&waiter); + @atomicStore(bool, &cond.pending, true, .SeqCst); + } + + mutex.unlock(); + waiter.data.wait(); + mutex.lock(); + } + + pub fn signal(cond: *AtomicCondition) void { + if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) + return; + + const maybe_waiter = blk: { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + const maybe_waiter = cond.queue_list.popFirst(); + @atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst); + break :blk maybe_waiter; + }; + + if (maybe_waiter) |waiter| + waiter.data.notify(); + } + + pub fn broadcast(cond: *AtomicCondition) void { + if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) + return; + + @atomicStore(bool, &cond.pending, false, .SeqCst); + + var waiters = blk: { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + const waiters = cond.queue_list; + cond.queue_list = .{}; + break :blk waiters; + }; + + while (waiters.popFirst()) |waiter| + waiter.data.notify(); + } +}; diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig new file mode 100644 index 0000000000..128eb0be80 --- /dev/null +++ b/lib/std/Thread/Mutex.zig @@ -0,0 +1,303 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! Lock may be held only once. If the same thread tries to acquire +//! the same mutex twice, it deadlocks. This type supports static +//! initialization and is at most `@sizeOf(usize)` in size. When an +//! application is built in single threaded release mode, all the +//! functions are no-ops. In single threaded debug mode, there is +//! deadlock detection. +//! +//! Example usage: +//! var m = Mutex{}; +//! +//! const lock = m.acquire(); +//! defer lock.release(); +//! ... critical code +//! +//! Non-blocking: +//! if (m.tryAcquire) |lock| { +//! defer lock.release(); +//! // ... critical section +//! } else { +//! // ... lock not acquired +//! } + +impl: Impl = .{}, + +const Mutex = @This(); +const std = @import("../std.zig"); +const builtin = std.builtin; +const os = std.os; +const assert = std.debug.assert; +const windows = os.windows; +const linux = os.linux; +const testing = std.testing; +const StaticResetEvent = std.thread.StaticResetEvent; + +pub const Held = struct { + impl: *Impl, + + pub fn release(held: Held) void { + held.impl.release(); + } +}; + +/// Try to acquire the mutex without blocking. Returns null if +/// the mutex is unavailable. Otherwise returns Held. Call +/// release on Held. +pub fn tryAcquire(m: *Mutex) ?Held { + if (m.impl.tryAcquire()) { + return Held{ .impl = &m.impl }; + } else { + return null; + } +} + +/// Acquire the mutex. Deadlocks if the mutex is already +/// held by the calling thread. +pub fn acquire(m: *Mutex) Held { + m.impl.acquire(); + return .{ .impl = &m.impl }; +} + +const Impl = if (builtin.single_threaded) + Dummy +else if (builtin.os.tag == .windows) + WindowsMutex +else if (std.Thread.use_pthreads) + PthreadMutex +else + AtomicMutex; + +pub const AtomicMutex = struct { + state: State = .unlocked, + + const State = enum(i32) { + unlocked, + locked, + waiting, + }; + + pub fn tryAcquire(self: *AtomicMutex) bool { + return @cmpxchgStrong( + State, + &self.state, + .unlocked, + .locked, + .Acquire, + .Monotonic, + ) == null; + } + + pub fn acquire(self: *AtomicMutex) void { + switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) { + .unlocked => {}, + else => |s| self.lockSlow(s), + } + } + + fn lockSlow(self: *AtomicMutex, current_state: State) void { + @setCold(true); + var new_state = current_state; + + var spin: u8 = 0; + while (spin < 100) : (spin += 1) { + const state = @cmpxchgWeak( + State, + &self.state, + .unlocked, + new_state, + .Acquire, + .Monotonic, + ) orelse return; + + switch (state) { + .unlocked => {}, + .locked => {}, + .waiting => break, + } + + var iter = std.math.min(32, spin + 1); + while (iter > 0) : (iter -= 1) + std.Thread.spinLoopHint(); + } + + new_state = .waiting; + while (true) { + switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) { + .unlocked => return, + else => {}, + } + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wait( + @ptrCast(*const i32, &self.state), + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT, + @enumToInt(new_state), + null, + ))) { + 0 => {}, + std.os.EINTR => {}, + std.os.EAGAIN => {}, + else => unreachable, + } + }, + else => std.Thread.spinLoopHint(), + } + } + } + + pub fn release(self: *AtomicMutex) void { + switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) { + .unlocked => unreachable, + .locked => {}, + .waiting => self.unlockSlow(), + } + } + + fn unlockSlow(self: *AtomicMutex) void { + @setCold(true); + + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wake( + @ptrCast(*const i32, &self.state), + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE, + 1, + ))) { + 0 => {}, + std.os.EFAULT => {}, + else => unreachable, + } + }, + else => {}, + } + } +}; + +pub const PthreadMutex = struct { + pthread_mutex: std.c.pthread_mutex_t = .{}, + + /// Try to acquire the mutex without blocking. Returns null if + /// the mutex is unavailable. Otherwise returns Held. Call + /// release on Held. + pub fn tryAcquire(self: *PthreadMutex) bool { + return std.c.pthread_mutex_trylock(&self.pthread_mutex) == 0; + } + + /// Acquire the mutex. Will deadlock if the mutex is already + /// held by the calling thread. + pub fn acquire(self: *PthreadMutex) void { + switch (std.c.pthread_mutex_lock(&self.pthread_mutex)) { + 0 => return, + std.c.EINVAL => unreachable, + std.c.EBUSY => unreachable, + std.c.EAGAIN => unreachable, + std.c.EDEADLK => unreachable, + std.c.EPERM => unreachable, + else => unreachable, + } + } + + pub fn release(self: *PthreadMutex) void { + switch (std.c.pthread_mutex_unlock(&self.pthread_mutex)) { + 0 => return, + std.c.EINVAL => unreachable, + std.c.EAGAIN => unreachable, + std.c.EPERM => unreachable, + else => unreachable, + } + } +}; + +/// This has the sematics as `Mutex`, however it does not actually do any +/// synchronization. Operations are safety-checked no-ops. +pub const Dummy = struct { + lock: @TypeOf(lock_init) = lock_init, + + const lock_init = if (std.debug.runtime_safety) false else {}; + + /// Try to acquire the mutex without blocking. Returns null if + /// the mutex is unavailable. Otherwise returns Held. Call + /// release on Held. + pub fn tryAcquire(self: *Dummy) bool { + if (std.debug.runtime_safety) { + if (self.lock) return false; + self.lock = true; + } + return true; + } + + /// Acquire the mutex. Will deadlock if the mutex is already + /// held by the calling thread. + pub fn acquire(self: *Dummy) void { + return self.tryAcquire() orelse @panic("deadlock detected"); + } + + pub fn release(self: *Dummy) void { + if (std.debug.runtime_safety) { + self.mutex.lock = false; + } + } +}; + +const WindowsMutex = struct { + srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT, + + pub fn tryAcquire(self: *WindowsMutex) bool { + return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE; + } + + pub fn acquire(self: *WindowsMutex) void { + AcquireSRWLockExclusive(&self.srwlock); + } + + pub fn release(self: *WindowsMutex) void { + ReleaseSRWLockExclusive(&self.srwlock); + } +}; + +const TestContext = struct { + mutex: *Mutex, + data: i128, + + const incr_count = 10000; +}; + +test "basic usage" { + var mutex = Mutex{}; + + var context = TestContext{ + .mutex = &mutex, + .data = 0, + }; + + if (builtin.single_threaded) { + worker(&context); + testing.expect(context.data == TestContext.incr_count); + } else { + const thread_count = 10; + var threads: [thread_count]*std.Thread = undefined; + for (threads) |*t| { + t.* = try std.Thread.spawn(&context, worker); + } + for (threads) |t| + t.wait(); + + testing.expect(context.data == thread_count * TestContext.incr_count); + } +} + +fn worker(ctx: *TestContext) void { + var i: usize = 0; + while (i != TestContext.incr_count) : (i += 1) { + const held = ctx.mutex.acquire(); + defer held.release(); + + ctx.data += 1; + } +} diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig new file mode 100644 index 0000000000..622f9be98e --- /dev/null +++ b/lib/std/Thread/ResetEvent.zig @@ -0,0 +1,297 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API requires being initialized at runtime, and initialization +//! can fail. Once initialized, the core operations cannot fail. +//! If you need an abstraction that cannot fail to be initialized, see +//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure, +//! it is preferred to use `ResetEvent`. + +const ResetEvent = @This(); +const std = @import("../std.zig"); +const builtin = std.builtin; +const testing = std.testing; +const assert = std.debug.assert; +const c = std.c; +const os = std.os; +const time = std.time; + +impl: Impl, + +pub const Impl = if (builtin.single_threaded) + std.Thread.StaticResetEvent.DebugEvent +else if (std.Target.current.isDarwin()) + DarwinEvent +else if (std.Thread.use_pthreads) + PosixEvent +else + std.Thread.StaticResetEvent.AtomicEvent; + +pub const InitError = error{SystemResources}; + +/// After `init`, it is legal to call any other function. +pub fn init(ev: *ResetEvent) InitError!void { + return ev.impl.init(); +} + +/// This function is not thread-safe. +/// After `deinit`, the only legal function to call is `init`. +pub fn deinit(ev: *ResetEvent) void { + return ev.impl.deinit(); +} + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *ResetEvent) void { + return ev.impl.set(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *ResetEvent) void { + return ev.impl.reset(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn wait(ev: *ResetEvent) void { + return ev.impl.wait(); +} + +pub const TimedWaitResult = enum { event_set, timed_out }; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `wait`, the only functions available to be called +/// in `ResetEvent` are `reset` and `deinit`. +pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// Apple has decided to not support POSIX semaphores, so we go with a +/// different approach using Grand Central Dispatch. This API is exposed +/// by libSystem so it is guaranteed to be available on all Darwin platforms. +pub const DarwinEvent = struct { + sem: c.dispatch_semaphore_t = undefined, + + pub fn init(ev: *DarwinEvent) !void { + ev.* = .{ + .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources, + }; + } + + pub fn deinit(ev: *DarwinEvent) void { + c.dispatch_release(ev.sem); + ev.* = undefined; + } + + pub fn set(ev: *DarwinEvent) void { + // Empirically this returns the numerical value of the semaphore. + _ = c.dispatch_semaphore_signal(ev.sem); + } + + pub fn wait(ev: *DarwinEvent) void { + assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0); + } + + pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult { + const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns)); + if (c.dispatch_semaphore_wait(ev.sem, t) != 0) { + return .timed_out; + } else { + return .event_set; + } + } + + pub fn reset(ev: *DarwinEvent) void { + // Keep calling until the semaphore goes back down to 0. + while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {} + } +}; + +/// POSIX semaphores must be initialized at runtime because they are allowed to +/// be implemented as file descriptors, in which case initialization would require +/// a syscall to open the fd. +pub const PosixEvent = struct { + sem: c.sem_t = undefined, + + pub fn init(ev: *PosixEvent) !void { + switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) { + 0 => return, + else => return error.SystemResources, + } + } + + pub fn deinit(ev: *PosixEvent) void { + assert(c.sem_destroy(&ev.sem) == 0); + ev.* = undefined; + } + + pub fn set(ev: *PosixEvent) void { + assert(c.sem_post(&ev.sem) == 0); + } + + pub fn wait(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_wait(&ev.sem))) { + 0 => return, + c.EINTR => continue, + c.EINVAL => unreachable, + else => unreachable, + } + } + } + + pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult { + var ts: os.timespec = undefined; + var timeout_abs = timeout_ns; + os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out; + timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; + timeout_abs += @intCast(u64, ts.tv_nsec); + ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); + ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); + while (true) { + switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) { + 0 => return .event_set, + c.EINTR => continue, + c.EINVAL => unreachable, + c.ETIMEDOUT => return .timed_out, + else => unreachable, + } + } + } + + pub fn reset(ev: *PosixEvent) void { + while (true) { + switch (c.getErrno(c.sem_trywait(&ev.sem))) { + 0 => continue, // Need to make it go to zero. + c.EINTR => continue, + c.EINVAL => unreachable, + c.EAGAIN => return, // The semaphore currently has the value zero. + else => unreachable, + } + } + } +}; + +test "basic usage" { + var event: ResetEvent = undefined; + try event.init(); + defer event.deinit(); + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128, + in: ResetEvent, + out: ResetEvent, + + fn init(self: *Self) !void { + self.* = .{ + .value = 0, + .in = undefined, + .out = undefined, + }; + try self.in.init(); + try self.out.init(); + } + + fn deinit(self: *Self) void { + self.in.deinit(); + self.out.deinit(); + self.* = undefined; + } + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context: Context = undefined; + try context.init(); + defer context.deinit(); + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} diff --git a/lib/std/Thread/RwLock.zig b/lib/std/Thread/RwLock.zig new file mode 100644 index 0000000000..1d606a9cf1 --- /dev/null +++ b/lib/std/Thread/RwLock.zig @@ -0,0 +1,308 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A lock that supports one writer or many readers. +//! This API is for kernel threads, not evented I/O. +//! This API requires being initialized at runtime, and initialization +//! can fail. Once initialized, the core operations cannot fail. + +impl: Impl, + +const RwLock = @This(); +const std = @import("../std.zig"); +const builtin = std.builtin; +const assert = std.debug.assert; +const Mutex = std.Thread.Mutex; +const Semaphore = std.Semaphore; +const CondVar = std.CondVar; + +pub const Impl = if (builtin.single_threaded) + SingleThreadedRwLock +else if (std.Thread.use_pthreads) + PthreadRwLock +else + DefaultRwLock; + +pub fn init(rwl: *RwLock) void { + return rwl.impl.init(); +} + +pub fn deinit(rwl: *RwLock) void { + return rwl.impl.deinit(); +} + +/// Attempts to obtain exclusive lock ownership. +/// Returns `true` if the lock is obtained, `false` otherwise. +pub fn tryLock(rwl: *RwLock) bool { + return rwl.impl.tryLock(); +} + +/// Blocks until exclusive lock ownership is acquired. +pub fn lock(rwl: *RwLock) void { + return rwl.impl.lock(); +} + +/// Releases a held exclusive lock. +/// Asserts the lock is held exclusively. +pub fn unlock(rwl: *RwLock) void { + return rwl.impl.unlock(); +} + +/// Attempts to obtain shared lock ownership. +/// Returns `true` if the lock is obtained, `false` otherwise. +pub fn tryLockShared(rwl: *RwLock) bool { + return rwl.impl.tryLockShared(); +} + +/// Blocks until shared lock ownership is acquired. +pub fn lockShared(rwl: *RwLock) void { + return rwl.impl.lockShared(); +} + +/// Releases a held shared lock. +pub fn unlockShared(rwl: *RwLock) void { + return rwl.impl.unlockShared(); +} + +/// Single-threaded applications use this for deadlock checks in +/// debug mode, and no-ops in release modes. +pub const SingleThreadedRwLock = struct { + state: enum { unlocked, locked_exclusive, locked_shared }, + shared_count: usize, + + pub fn init(rwl: *SingleThreadedRwLock) void { + rwl.* = .{ + .state = .unlocked, + .shared_count = 0, + }; + } + + pub fn deinit(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .unlocked); + assert(rwl.shared_count == 0); + } + + /// Attempts to obtain exclusive lock ownership. + /// Returns `true` if the lock is obtained, `false` otherwise. + pub fn tryLock(rwl: *SingleThreadedRwLock) bool { + switch (rwl.state) { + .unlocked => { + assert(rwl.shared_count == 0); + rwl.state = .locked_exclusive; + return true; + }, + .locked_exclusive, .locked_shared => return false, + } + } + + /// Blocks until exclusive lock ownership is acquired. + pub fn lock(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .unlocked); // deadlock detected + assert(rwl.shared_count == 0); // corrupted state detected + rwl.state = .locked_exclusive; + } + + /// Releases a held exclusive lock. + /// Asserts the lock is held exclusively. + pub fn unlock(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .locked_exclusive); + assert(rwl.shared_count == 0); // corrupted state detected + rwl.state = .unlocked; + } + + /// Attempts to obtain shared lock ownership. + /// Returns `true` if the lock is obtained, `false` otherwise. + pub fn tryLockShared(rwl: *SingleThreadedRwLock) bool { + switch (rwl.state) { + .unlocked => { + rwl.state = .locked_shared; + assert(rwl.shared_count == 0); + rwl.shared_count = 1; + return true; + }, + .locked_exclusive, .locked_shared => return false, + } + } + + /// Blocks until shared lock ownership is acquired. + pub fn lockShared(rwl: *SingleThreadedRwLock) void { + switch (rwl.state) { + .unlocked => { + rwl.state = .locked_shared; + assert(rwl.shared_count == 0); + rwl.shared_count = 1; + }, + .locked_shared => { + rwl.shared_count += 1; + }, + .locked_exclusive => unreachable, // deadlock detected + } + } + + /// Releases a held shared lock. + pub fn unlockShared(rwl: *SingleThreadedRwLock) void { + switch (rwl.state) { + .unlocked => unreachable, // too many calls to `unlockShared` + .locked_exclusive => unreachable, // exclusively held lock + .locked_shared => { + rwl.shared_count -= 1; + if (rwl.shared_count == 0) { + rwl.state = .unlocked; + } + }, + } + } +}; + +pub const PthreadRwLock = struct { + rwlock: pthread_rwlock_t, + + pub fn init(rwl: *PthreadRwLock) void { + rwl.* = .{ .rwlock = .{} }; + } + + pub fn deinit(rwl: *PthreadRwLock) void { + const safe_rc = switch (std.builtin.os.tag) { + .dragonfly, .netbsd => std.os.EAGAIN, + else => 0, + }; + + const rc = std.c.pthread_rwlock_destroy(&rwl.rwlock); + assert(rc == 0 or rc == safe_rc); + + rwl.* = undefined; + } + + pub fn tryLock(rwl: *PthreadRwLock) bool { + return pthread_rwlock_trywrlock(&rwl.rwlock) == 0; + } + + pub fn lock(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_wrlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn unlock(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_unlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn tryLockShared(rwl: *PthreadRwLock) bool { + return pthread_rwlock_tryrdlock(&rwl.rwlock) == 0; + } + + pub fn lockShared(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_rdlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn unlockShared(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_unlock(&rwl.rwlock); + assert(rc == 0); + } +}; + +pub const DefaultRwLock = struct { + state: usize, + mutex: Mutex, + semaphore: Semaphore, + + const IS_WRITING: usize = 1; + const WRITER: usize = 1 << 1; + const READER: usize = 1 << (1 + std.meta.bitCount(Count)); + const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, WRITER); + const READER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, READER); + const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2)); + + pub fn init(rwl: *DefaultRwLock) void { + rwl.* = .{ + .state = 0, + .mutex = Mutex.init(), + .semaphore = Semaphore.init(0), + }; + } + + pub fn deinit(rwl: *DefaultRwLock) void { + rwl.semaphore.deinit(); + rwl.mutex.deinit(); + rwl.* = undefined; + } + + pub fn tryLock(rwl: *DefaultRwLock) bool { + if (rwl.mutex.tryLock()) { + const state = @atomicLoad(usize, &rwl.state, .SeqCst); + if (state & READER_MASK == 0) { + _ = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst); + return true; + } + + rwl.mutex.unlock(); + } + + return false; + } + + pub fn lock(rwl: *DefaultRwLock) void { + _ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .SeqCst); + rwl.mutex.lock(); + + const state = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst); + if (state & READER_MASK != 0) + rwl.semaphore.wait(); + } + + pub fn unlock(rwl: *DefaultRwLock) void { + _ = @atomicRmw(usize, &rwl.state, .And, ~IS_WRITING, .SeqCst); + rwl.mutex.unlock(); + } + + pub fn tryLockShared(rwl: *DefaultRwLock) bool { + const state = @atomicLoad(usize, &rwl.state, .SeqCst); + if (state & (IS_WRITING | WRITER_MASK) == 0) { + _ = @cmpxchgStrong( + usize, + &rwl.state, + state, + state + READER, + .SeqCst, + .SeqCst, + ) orelse return true; + } + + if (rwl.mutex.tryLock()) { + _ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst); + rwl.mutex.unlock(); + return true; + } + + return false; + } + + pub fn lockShared(rwl: *DefaultRwLock) void { + var state = @atomicLoad(usize, &rwl.state, .SeqCst); + while (state & (IS_WRITING | WRITER_MASK) == 0) { + state = @cmpxchgWeak( + usize, + &rwl.state, + state, + state + READER, + .SeqCst, + .SeqCst, + ) orelse return; + } + + rwl.mutex.lock(); + _ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst); + rwl.mutex.unlock(); + } + + pub fn unlockShared(rwl: *DefaultRwLock) void { + const state = @atomicRmw(usize, &rwl.state, .Sub, READER, .SeqCst); + + if ((state & READER_MASK == READER) and (state & IS_WRITING != 0)) + rwl.semaphore.post(); + } +}; diff --git a/lib/std/Thread/Semaphore.zig b/lib/std/Thread/Semaphore.zig new file mode 100644 index 0000000000..77a278b355 --- /dev/null +++ b/lib/std/Thread/Semaphore.zig @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A semaphore is an unsigned integer that blocks the kernel thread if +//! the number would become negative. +//! This API supports static initialization and does not require deinitialization. + +mutex: Mutex = .{}, +cond: Condition = .{}, +//! It is OK to initialize this field to any value. +permits: usize = 0, + +const RwLock = @This(); +const std = @import("../std.zig"); +const Mutex = std.Thread.Mutex; +const Condition = std.Thread.Condition; + +pub fn wait(sem: *Semaphore) void { + const held = sem.mutex.acquire(); + defer held.release(); + + while (sem.permits == 0) + sem.cond.wait(&sem.mutex); + + sem.permits -= 1; + if (sem.permits > 0) + sem.cond.signal(); +} + +pub fn post(sem: *Semaphore) void { + const held = sem.mutex.acquire(); + defer held.release(); + + sem.permits += 1; + sem.cond.signal(); +} diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig new file mode 100644 index 0000000000..414583e477 --- /dev/null +++ b/lib/std/Thread/StaticResetEvent.zig @@ -0,0 +1,396 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A thread-safe resource which supports blocking until signaled. +//! This API is for kernel threads, not evented I/O. +//! This API is statically initializable. It cannot fail to be initialized +//! and it requires no deinitialization. The downside is that it may not +//! integrate as cleanly into other synchronization APIs, or, in a worst case, +//! may be forced to fall back on spin locking. As a rule of thumb, prefer +//! to use `std.Thread.ResetEvent` when possible, and use `StaticResetEvent` when +//! the logic needs stronger API guarantees. + +const std = @import("../std.zig"); +const StaticResetEvent = @This(); +const SpinLock = std.SpinLock; +const assert = std.debug.assert; +const os = std.os; +const time = std.time; +const linux = std.os.linux; +const windows = std.os.windows; +const testing = std.testing; + +impl: Impl = .{}, + +pub const Impl = if (std.builtin.single_threaded) + DebugEvent +else + AtomicEvent; + +/// Sets the event if not already set and wakes up all the threads waiting on +/// the event. It is safe to call `set` multiple times before calling `wait`. +/// However it is illegal to call `set` after `wait` is called until the event +/// is `reset`. This function is thread-safe. +pub fn set(ev: *StaticResetEvent) void { + return ev.impl.set(); +} + +/// Wait for the event to be set by blocking the current thread. +/// Thread-safe. No spurious wakeups. +/// Upon return from `wait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn wait(ev: *StaticResetEvent) void { + return ev.impl.wait(); +} + +/// Resets the event to its original, unset state. +/// This function is *not* thread-safe. It is equivalent to calling +/// `deinit` followed by `init` but without the possibility of failure. +pub fn reset(ev: *StaticResetEvent) void { + return ev.impl.reset(); +} + +pub const TimedWaitResult = std.Thread.ResetEvent.TimedWaitResult; + +/// Wait for the event to be set by blocking the current thread. +/// A timeout in nanoseconds can be provided as a hint for how +/// long the thread should block on the unset event before returning +/// `TimedWaitResult.timed_out`. +/// Thread-safe. No precision of timing is guaranteed. +/// Upon return from `timedWait`, the only function available to be called +/// in `StaticResetEvent` is `reset`. +pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult { + return ev.impl.timedWait(timeout_ns); +} + +/// For single-threaded builds, we use this to detect deadlocks. +/// In unsafe modes this ends up being no-ops. +pub const DebugEvent = struct { + state: State = State.unset, + + const State = enum { + unset, + set, + waited, + }; + + /// This function is provided so that this type can be re-used inside + /// `std.Thread.ResetEvent`. + pub fn init(ev: *DebugEvent) void { + ev.* = .{}; + } + + /// This function is provided so that this type can be re-used inside + /// `std.Thread.ResetEvent`. + pub fn deinit(ev: *DebugEvent) void { + ev.* = undefined; + } + + pub fn set(ev: *DebugEvent) void { + switch (ev.state) { + .unset => ev.state = .set, + .set => {}, + .waited => unreachable, // Not allowed to call `set` until `reset`. + } + } + + pub fn wait(ev: *DebugEvent) void { + switch (ev.state) { + .unset => unreachable, // Deadlock detected. + .set => return, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } + } + + pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { + switch (ev.state) { + .unset => return .timed_out, + .set => return .event_set, + .waited => unreachable, // Not allowed to call `wait` until `reset`. + } + } + + pub fn reset(ev: *DebugEvent) void { + ev.state = .unset; + } +}; + +pub const AtomicEvent = struct { + waiters: u32 = 0, + + const WAKE = 1 << 0; + const WAIT = 1 << 1; + + /// This function is provided so that this type can be re-used inside + /// `std.Thread.ResetEvent`. + pub fn init(ev: *AtomicEvent) void { + ev.* = .{}; + } + + /// This function is provided so that this type can be re-used inside + /// `std.Thread.ResetEvent`. + pub fn deinit(ev: *AtomicEvent) void { + ev.* = undefined; + } + + pub fn set(ev: *AtomicEvent) void { + const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release); + if (waiters >= WAIT) { + return Futex.wake(&ev.waiters, waiters >> 1); + } + } + + pub fn wait(ev: *AtomicEvent) void { + switch (ev.timedWait(null)) { + .timed_out => unreachable, + .event_set => return, + } + } + + pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult { + var waiters = @atomicLoad(u32, &ev.waiters, .Acquire); + while (waiters != WAKE) { + waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse { + if (Futex.wait(&ev.waiters, timeout)) |_| { + return .event_set; + } else |_| { + return .timed_out; + } + }; + } + return .event_set; + } + + pub fn reset(ev: *AtomicEvent) void { + @atomicStore(u32, &ev.waiters, 0, .Monotonic); + } + + pub const Futex = switch (std.Target.current.os.tag) { + .windows => WindowsFutex, + .linux => LinuxFutex, + else => SpinFutex, + }; + + pub const SpinFutex = struct { + fn wake(waiters: *u32, wake_count: u32) void {} + + fn wait(waiters: *u32, timeout: ?u64) !void { + var timer: time.Timer = undefined; + if (timeout != null) + timer = time.Timer.start() catch return error.TimedOut; + + while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { + SpinLock.yield(); + if (timeout) |timeout_ns| { + if (timer.read() >= timeout_ns) + return error.TimedOut; + } + } + } + }; + + pub const LinuxFutex = struct { + fn wake(waiters: *u32, wake_count: u32) void { + const waiting = std.math.maxInt(i32); // wake_count + const ptr = @ptrCast(*const i32, waiters); + const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting); + assert(linux.getErrno(rc) == 0); + } + + fn wait(waiters: *u32, timeout: ?u64) !void { + var ts: linux.timespec = undefined; + var ts_ptr: ?*linux.timespec = null; + if (timeout) |timeout_ns| { + ts_ptr = &ts; + ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); + ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); + } + + while (true) { + const waiting = @atomicLoad(u32, waiters, .Acquire); + if (waiting == WAKE) + return; + const expected = @intCast(i32, waiting); + const ptr = @ptrCast(*const i32, waiters); + const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr); + switch (linux.getErrno(rc)) { + 0 => continue, + os.ETIMEDOUT => return error.TimedOut, + os.EINTR => continue, + os.EAGAIN => return, + else => unreachable, + } + } + } + }; + + pub const WindowsFutex = struct { + pub fn wake(waiters: *u32, wake_count: u32) void { + const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); + const key = @ptrCast(*const c_void, waiters); + + var waiting = wake_count; + while (waiting != 0) : (waiting -= 1) { + const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == .SUCCESS); + } + } + + pub fn wait(waiters: *u32, timeout: ?u64) !void { + const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout); + const key = @ptrCast(*const c_void, waiters); + + // NT uses timeouts in units of 100ns with negative value being relative + var timeout_ptr: ?*windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + if (timeout) |timeout_ns| { + timeout_ptr = &timeout_value; + timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); + } + + // NtWaitForKeyedEvent doesnt have spurious wake-ups + var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); + switch (rc) { + .TIMEOUT => { + // update the wait count to signal that we're not waiting anymore. + // if the .set() thread already observed that we are, perform a + // matching NtWaitForKeyedEvent so that the .set() thread doesn't + // deadlock trying to run NtReleaseKeyedEvent above. + var waiting = @atomicLoad(u32, waiters, .Monotonic); + while (true) { + if (waiting == WAKE) { + rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); + assert(rc == .WAIT_0); + break; + } else { + waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break; + continue; + } + } + return error.TimedOut; + }, + .WAIT_0 => {}, + else => unreachable, + } + } + + var event_handle: usize = EMPTY; + const EMPTY = ~@as(usize, 0); + const LOADING = EMPTY - 1; + + pub fn getEventHandle() ?windows.HANDLE { + var handle = @atomicLoad(usize, &event_handle, .Monotonic); + while (true) { + switch (handle) { + EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { + const handle_ptr = @ptrCast(*windows.HANDLE, &handle); + const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; + if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS) + handle = 0; + @atomicStore(usize, &event_handle, handle, .Monotonic); + return @intToPtr(?windows.HANDLE, handle); + }, + LOADING => { + SpinLock.yield(); + handle = @atomicLoad(usize, &event_handle, .Monotonic); + }, + else => { + return @intToPtr(?windows.HANDLE, handle); + }, + } + } + } + }; +}; + +test "basic usage" { + var event = StaticResetEvent{}; + + // test event setting + event.set(); + + // test event resetting + event.reset(); + + // test event waiting (non-blocking) + event.set(); + event.wait(); + event.reset(); + + event.set(); + testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + + // test cross-thread signaling + if (std.builtin.single_threaded) + return; + + const Context = struct { + const Self = @This(); + + value: u128 = 0, + in: StaticResetEvent = .{}, + out: StaticResetEvent = .{}, + + fn sender(self: *Self) void { + // update value and signal input + testing.expect(self.value == 0); + self.value = 1; + self.in.set(); + + // wait for receiver to update value and signal output + self.out.wait(); + testing.expect(self.value == 2); + + // update value and signal final input + self.value = 3; + self.in.set(); + } + + fn receiver(self: *Self) void { + // wait for sender to update value and signal input + self.in.wait(); + assert(self.value == 1); + + // update value and signal output + self.in.reset(); + self.value = 2; + self.out.set(); + + // wait for sender to update value and signal final input + self.in.wait(); + assert(self.value == 3); + } + + fn sleeper(self: *Self) void { + self.in.set(); + time.sleep(time.ns_per_ms * 2); + self.value = 5; + self.out.set(); + } + + fn timedWaiter(self: *Self) !void { + self.in.wait(); + testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); + try self.out.timedWait(time.ns_per_ms * 100); + testing.expect(self.value == 5); + } + }; + + var context = Context{}; + const receiver = try std.Thread.spawn(&context, Context.receiver); + defer receiver.wait(); + context.sender(); + + if (false) { + // I have now observed this fail on macOS, Windows, and Linux. + // https://github.com/ziglang/zig/issues/7009 + var timed = Context.init(); + defer timed.deinit(); + const sleeper = try std.Thread.spawn(&timed, Context.sleeper); + defer sleeper.wait(); + try timed.timedWaiter(); + } +} |
