aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread
diff options
context:
space:
mode:
Diffstat (limited to 'lib/std/Thread')
-rw-r--r--lib/std/Thread/AutoResetEvent.zig228
-rw-r--r--lib/std/Thread/Condition.zig194
-rw-r--r--lib/std/Thread/Mutex.zig319
-rw-r--r--lib/std/Thread/ResetEvent.zig297
-rw-r--r--lib/std/Thread/RwLock.zig308
-rw-r--r--lib/std/Thread/Semaphore.zig39
-rw-r--r--lib/std/Thread/StaticResetEvent.zig395
7 files changed, 1780 insertions, 0 deletions
diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig
new file mode 100644
index 0000000000..8b8b5658bf
--- /dev/null
+++ b/lib/std/Thread/AutoResetEvent.zig
@@ -0,0 +1,228 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`.
+//! Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like).
+//!
+//! AutoResetEvent has 3 possible states:
+//! - UNSET: the AutoResetEvent is currently unset
+//! - SET: the AutoResetEvent was notified before a wait() was called
+//! - <StaticResetEvent pointer>: there is an active waiter waiting for a notification.
+//!
+//! When attempting to wait:
+//! if the event is unset, it registers a ResetEvent pointer to be notified when the event is set
+//! if the event is already set, then it consumes the notification and resets the event.
+//!
+//! When attempting to notify:
+//! if the event is unset, then we set the event
+//! if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent
+//!
+//! This ensures that the event is automatically reset after a wait() has been issued
+//! and avoids the race condition when using StaticResetEvent in the following scenario:
+//! thread 1 | thread 2
+//! StaticResetEvent.wait() |
+//! | StaticResetEvent.set()
+//! | StaticResetEvent.set()
+//! StaticResetEvent.reset() |
+//! StaticResetEvent.wait() | (missed the second .set() notification above)
+
+state: usize = UNSET,
+
+const std = @import("../std.zig");
+const builtin = @import("builtin");
+const testing = std.testing;
+const assert = std.debug.assert;
+const StaticResetEvent = std.Thread.StaticResetEvent;
+const AutoResetEvent = @This();
+
+const UNSET = 0;
+const SET = 1;
+
+/// the minimum alignment for the `*StaticResetEvent` created by wait*()
+const event_align = std.math.max(@alignOf(StaticResetEvent), 2);
+
+pub fn wait(self: *AutoResetEvent) void {
+ self.waitFor(null) catch unreachable;
+}
+
+pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void {
+ return self.waitFor(timeout);
+}
+
+fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void {
+ // lazily initialized StaticResetEvent
+ var reset_event: StaticResetEvent align(event_align) = undefined;
+ var has_reset_event = false;
+
+ var state = @atomicLoad(usize, &self.state, .SeqCst);
+ while (true) {
+ // consume a notification if there is any
+ if (state == SET) {
+ @atomicStore(usize, &self.state, UNSET, .SeqCst);
+ return;
+ }
+
+ // check if theres currently a pending ResetEvent pointer already registered
+ if (state != UNSET) {
+ unreachable; // multiple waiting threads on the same AutoResetEvent
+ }
+
+ // lazily initialize the ResetEvent if it hasn't been already
+ if (!has_reset_event) {
+ has_reset_event = true;
+ reset_event = .{};
+ }
+
+ // Since the AutoResetEvent currently isnt set,
+ // try to register our ResetEvent on it to wait
+ // for a set() call from another thread.
+ if (@cmpxchgWeak(
+ usize,
+ &self.state,
+ UNSET,
+ @ptrToInt(&reset_event),
+ .SeqCst,
+ .SeqCst,
+ )) |new_state| {
+ state = new_state;
+ continue;
+ }
+
+ // if no timeout was specified, then just wait forever
+ const timeout_ns = timeout orelse {
+ reset_event.wait();
+ return;
+ };
+
+ // wait with a timeout and return if signalled via set()
+ switch (reset_event.timedWait(timeout_ns)) {
+ .event_set => return,
+ .timed_out => {},
+ }
+
+ // If we timed out, we need to transition the AutoResetEvent back to UNSET.
+ // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent.
+ state = @cmpxchgStrong(
+ usize,
+ &self.state,
+ @ptrToInt(&reset_event),
+ UNSET,
+ .SeqCst,
+ .SeqCst,
+ ) orelse return error.TimedOut;
+
+ // We didn't manage to unregister ourselves from the state.
+ if (state == SET) {
+ unreachable; // AutoResetEvent notified without waking up the waiting thread
+ } else if (state != UNSET) {
+ unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out
+ }
+
+ // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up.
+ // We need to wait for it to wake up our ResetEvent before we can return and invalidate it.
+ // We don't return error.TimedOut here as it technically notified us while we were "timing out".
+ reset_event.wait();
+ return;
+ }
+}
+
+pub fn set(self: *AutoResetEvent) void {
+ var state = @atomicLoad(usize, &self.state, .SeqCst);
+ while (true) {
+ // If the AutoResetEvent is already set, there is nothing else left to do
+ if (state == SET) {
+ return;
+ }
+
+ // If the AutoResetEvent isn't set,
+ // then try to leave a notification for the wait() thread that we set() it.
+ if (state == UNSET) {
+ state = @cmpxchgWeak(
+ usize,
+ &self.state,
+ UNSET,
+ SET,
+ .SeqCst,
+ .SeqCst,
+ ) orelse return;
+ continue;
+ }
+
+ // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting.
+ // Try to acquire ownership of it so that we can wake it up.
+ // This also resets the AutoResetEvent so that there is no race condition as defined above.
+ if (@cmpxchgWeak(
+ usize,
+ &self.state,
+ state,
+ UNSET,
+ .SeqCst,
+ .SeqCst,
+ )) |new_state| {
+ state = new_state;
+ continue;
+ }
+
+ const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state);
+ reset_event.set();
+ return;
+ }
+}
+
+test "basic usage" {
+ // test local code paths
+ {
+ var event = AutoResetEvent{};
+ testing.expectError(error.TimedOut, event.timedWait(1));
+ event.set();
+ event.wait();
+ }
+
+ // test cross-thread signaling
+ if (builtin.single_threaded)
+ return;
+
+ const Context = struct {
+ value: u128 = 0,
+ in: AutoResetEvent = AutoResetEvent{},
+ out: AutoResetEvent = AutoResetEvent{},
+
+ const Self = @This();
+
+ fn sender(self: *Self) void {
+ testing.expect(self.value == 0);
+ self.value = 1;
+ self.out.set();
+
+ self.in.wait();
+ testing.expect(self.value == 2);
+ self.value = 3;
+ self.out.set();
+
+ self.in.wait();
+ testing.expect(self.value == 4);
+ }
+
+ fn receiver(self: *Self) void {
+ self.out.wait();
+ testing.expect(self.value == 1);
+ self.value = 2;
+ self.in.set();
+
+ self.out.wait();
+ testing.expect(self.value == 3);
+ self.value = 4;
+ self.in.set();
+ }
+ };
+
+ var context = Context{};
+ const send_thread = try std.Thread.spawn(&context, Context.sender);
+ const recv_thread = try std.Thread.spawn(&context, Context.receiver);
+
+ send_thread.wait();
+ recv_thread.wait();
+}
diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig
new file mode 100644
index 0000000000..a14b57f6b4
--- /dev/null
+++ b/lib/std/Thread/Condition.zig
@@ -0,0 +1,194 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A condition provides a way for a kernel thread to block until it is signaled
+//! to wake up. Spurious wakeups are possible.
+//! This API supports static initialization and does not require deinitialization.
+
+impl: Impl = .{},
+
+const std = @import("../std.zig");
+const Condition = @This();
+const windows = std.os.windows;
+const linux = std.os.linux;
+const Mutex = std.Thread.Mutex;
+const assert = std.debug.assert;
+
+pub fn wait(cond: *Condition, mutex: *Mutex) void {
+ cond.impl.wait(mutex);
+}
+
+pub fn signal(cond: *Condition) void {
+ cond.impl.signal();
+}
+
+pub fn broadcast(cond: *Condition) void {
+ cond.impl.broadcast();
+}
+
+const Impl = if (std.builtin.single_threaded)
+ SingleThreadedCondition
+else if (std.Target.current.os.tag == .windows)
+ WindowsCondition
+else if (std.Thread.use_pthreads)
+ PthreadCondition
+else
+ AtomicCondition;
+
+pub const SingleThreadedCondition = struct {
+ pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void {
+ unreachable; // deadlock detected
+ }
+
+ pub fn signal(cond: *SingleThreadedCondition) void {}
+
+ pub fn broadcast(cond: *SingleThreadedCondition) void {}
+};
+
+pub const WindowsCondition = struct {
+ cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT,
+
+ pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void {
+ const rc = windows.kernel32.SleepConditionVariableSRW(
+ &cond.cond,
+ &mutex.srwlock,
+ windows.INFINITE,
+ @as(windows.ULONG, 0),
+ );
+ assert(rc != windows.FALSE);
+ }
+
+ pub fn signal(cond: *WindowsCondition) void {
+ windows.kernel32.WakeConditionVariable(&cond.cond);
+ }
+
+ pub fn broadcast(cond: *WindowsCondition) void {
+ windows.kernel32.WakeAllConditionVariable(&cond.cond);
+ }
+};
+
+pub const PthreadCondition = struct {
+ cond: std.c.pthread_cond_t = .{},
+
+ pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void {
+ const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.impl.pthread_mutex);
+ assert(rc == 0);
+ }
+
+ pub fn signal(cond: *PthreadCondition) void {
+ const rc = std.c.pthread_cond_signal(&cond.cond);
+ assert(rc == 0);
+ }
+
+ pub fn broadcast(cond: *PthreadCondition) void {
+ const rc = std.c.pthread_cond_broadcast(&cond.cond);
+ assert(rc == 0);
+ }
+};
+
+pub const AtomicCondition = struct {
+ pending: bool = false,
+ queue_mutex: Mutex = .{},
+ queue_list: QueueList = .{},
+
+ pub const QueueList = std.SinglyLinkedList(QueueItem);
+
+ pub const QueueItem = struct {
+ futex: i32 = 0,
+
+ fn wait(cond: *@This()) void {
+ while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) {
+ switch (std.Target.current.os.tag) {
+ .linux => {
+ switch (linux.getErrno(linux.futex_wait(
+ &cond.futex,
+ linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT,
+ 0,
+ null,
+ ))) {
+ 0 => {},
+ std.os.EINTR => {},
+ std.os.EAGAIN => {},
+ else => unreachable,
+ }
+ },
+ else => spinLoopHint(),
+ }
+ }
+ }
+
+ fn notify(cond: *@This()) void {
+ @atomicStore(i32, &cond.futex, 1, .Release);
+
+ switch (std.Target.current.os.tag) {
+ .linux => {
+ switch (linux.getErrno(linux.futex_wake(
+ &cond.futex,
+ linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE,
+ 1,
+ ))) {
+ 0 => {},
+ std.os.EFAULT => {},
+ else => unreachable,
+ }
+ },
+ else => {},
+ }
+ }
+ };
+
+ pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void {
+ var waiter = QueueList.Node{ .data = .{} };
+
+ {
+ const held = cond.queue_mutex.acquire();
+ defer held.release();
+
+ cond.queue_list.prepend(&waiter);
+ @atomicStore(bool, &cond.pending, true, .SeqCst);
+ }
+
+ mutex.unlock();
+ waiter.data.wait();
+ mutex.lock();
+ }
+
+ pub fn signal(cond: *AtomicCondition) void {
+ if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
+ return;
+
+ const maybe_waiter = blk: {
+ const held = cond.queue_mutex.acquire();
+ defer held.release();
+
+ const maybe_waiter = cond.queue_list.popFirst();
+ @atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst);
+ break :blk maybe_waiter;
+ };
+
+ if (maybe_waiter) |waiter|
+ waiter.data.notify();
+ }
+
+ pub fn broadcast(cond: *AtomicCondition) void {
+ if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
+ return;
+
+ @atomicStore(bool, &cond.pending, false, .SeqCst);
+
+ var waiters = blk: {
+ const held = cond.queue_mutex.acquire();
+ defer held.release();
+
+ const waiters = cond.queue_list;
+ cond.queue_list = .{};
+ break :blk waiters;
+ };
+
+ while (waiters.popFirst()) |waiter|
+ waiter.data.notify();
+ }
+};
diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig
new file mode 100644
index 0000000000..94711bcda0
--- /dev/null
+++ b/lib/std/Thread/Mutex.zig
@@ -0,0 +1,319 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! Lock may be held only once. If the same thread tries to acquire
+//! the same mutex twice, it deadlocks. This type supports static
+//! initialization and is at most `@sizeOf(usize)` in size. When an
+//! application is built in single threaded release mode, all the
+//! functions are no-ops. In single threaded debug mode, there is
+//! deadlock detection.
+//!
+//! Example usage:
+//! var m = Mutex{};
+//!
+//! const lock = m.acquire();
+//! defer lock.release();
+//! ... critical code
+//!
+//! Non-blocking:
+//! if (m.tryAcquire) |lock| {
+//! defer lock.release();
+//! // ... critical section
+//! } else {
+//! // ... lock not acquired
+//! }
+
+impl: Impl = .{},
+
+const Mutex = @This();
+const std = @import("../std.zig");
+const builtin = std.builtin;
+const os = std.os;
+const assert = std.debug.assert;
+const windows = os.windows;
+const linux = os.linux;
+const testing = std.testing;
+const StaticResetEvent = std.thread.StaticResetEvent;
+
+/// Try to acquire the mutex without blocking. Returns `null` if the mutex is
+/// unavailable. Otherwise returns `Held`. Call `release` on `Held`.
+pub fn tryAcquire(m: *Mutex) ?Impl.Held {
+ return m.impl.tryAcquire();
+}
+
+/// Acquire the mutex. Deadlocks if the mutex is already
+/// held by the calling thread.
+pub fn acquire(m: *Mutex) Impl.Held {
+ return m.impl.acquire();
+}
+
+const Impl = if (builtin.single_threaded)
+ Dummy
+else if (builtin.os.tag == .windows)
+ WindowsMutex
+else if (std.Thread.use_pthreads)
+ PthreadMutex
+else
+ AtomicMutex;
+
+pub const AtomicMutex = struct {
+ state: State = .unlocked,
+
+ const State = enum(i32) {
+ unlocked,
+ locked,
+ waiting,
+ };
+
+ pub const Held = struct {
+ mutex: *AtomicMutex,
+
+ pub fn release(held: Held) void {
+ switch (@atomicRmw(State, &held.mutex.state, .Xchg, .unlocked, .Release)) {
+ .unlocked => unreachable,
+ .locked => {},
+ .waiting => held.mutex.unlockSlow(),
+ }
+ }
+ };
+
+ pub fn tryAcquire(m: *AtomicMutex) ?Held {
+ if (@cmpxchgStrong(
+ State,
+ &m.state,
+ .unlocked,
+ .locked,
+ .Acquire,
+ .Monotonic,
+ ) == null) {
+ return Held{ .mutex = m };
+ } else {
+ return null;
+ }
+ }
+
+ pub fn acquire(m: *AtomicMutex) Held {
+ switch (@atomicRmw(State, &m.state, .Xchg, .locked, .Acquire)) {
+ .unlocked => {},
+ else => |s| m.lockSlow(s),
+ }
+ return Held{ .mutex = m };
+ }
+
+ fn lockSlow(m: *AtomicMutex, current_state: State) void {
+ @setCold(true);
+ var new_state = current_state;
+
+ var spin: u8 = 0;
+ while (spin < 100) : (spin += 1) {
+ const state = @cmpxchgWeak(
+ State,
+ &m.state,
+ .unlocked,
+ new_state,
+ .Acquire,
+ .Monotonic,
+ ) orelse return;
+
+ switch (state) {
+ .unlocked => {},
+ .locked => {},
+ .waiting => break,
+ }
+
+ var iter = std.math.min(32, spin + 1);
+ while (iter > 0) : (iter -= 1)
+ std.Thread.spinLoopHint();
+ }
+
+ new_state = .waiting;
+ while (true) {
+ switch (@atomicRmw(State, &m.state, .Xchg, new_state, .Acquire)) {
+ .unlocked => return,
+ else => {},
+ }
+ switch (std.Target.current.os.tag) {
+ .linux => {
+ switch (linux.getErrno(linux.futex_wait(
+ @ptrCast(*const i32, &m.state),
+ linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT,
+ @enumToInt(new_state),
+ null,
+ ))) {
+ 0 => {},
+ std.os.EINTR => {},
+ std.os.EAGAIN => {},
+ else => unreachable,
+ }
+ },
+ else => std.Thread.spinLoopHint(),
+ }
+ }
+ }
+
+ fn unlockSlow(m: *AtomicMutex) void {
+ @setCold(true);
+
+ switch (std.Target.current.os.tag) {
+ .linux => {
+ switch (linux.getErrno(linux.futex_wake(
+ @ptrCast(*const i32, &m.state),
+ linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE,
+ 1,
+ ))) {
+ 0 => {},
+ std.os.EFAULT => {},
+ else => unreachable,
+ }
+ },
+ else => {},
+ }
+ }
+};
+
+pub const PthreadMutex = struct {
+ pthread_mutex: std.c.pthread_mutex_t = .{},
+
+ pub const Held = struct {
+ mutex: *PthreadMutex,
+
+ pub fn release(held: Held) void {
+ switch (std.c.pthread_mutex_unlock(&held.mutex.pthread_mutex)) {
+ 0 => return,
+ std.c.EINVAL => unreachable,
+ std.c.EAGAIN => unreachable,
+ std.c.EPERM => unreachable,
+ else => unreachable,
+ }
+ }
+ };
+
+ /// Try to acquire the mutex without blocking. Returns null if
+ /// the mutex is unavailable. Otherwise returns Held. Call
+ /// release on Held.
+ pub fn tryAcquire(m: *PthreadMutex) ?Held {
+ if (std.c.pthread_mutex_trylock(&m.pthread_mutex) == 0) {
+ return Held{ .mutex = m };
+ } else {
+ return null;
+ }
+ }
+
+ /// Acquire the mutex. Will deadlock if the mutex is already
+ /// held by the calling thread.
+ pub fn acquire(m: *PthreadMutex) Held {
+ switch (std.c.pthread_mutex_lock(&m.pthread_mutex)) {
+ 0 => return Held{ .mutex = m },
+ std.c.EINVAL => unreachable,
+ std.c.EBUSY => unreachable,
+ std.c.EAGAIN => unreachable,
+ std.c.EDEADLK => unreachable,
+ std.c.EPERM => unreachable,
+ else => unreachable,
+ }
+ }
+};
+
+/// This has the sematics as `Mutex`, however it does not actually do any
+/// synchronization. Operations are safety-checked no-ops.
+pub const Dummy = struct {
+ lock: @TypeOf(lock_init) = lock_init,
+
+ const lock_init = if (std.debug.runtime_safety) false else {};
+
+ pub const Held = struct {
+ mutex: *Dummy,
+
+ pub fn release(held: Held) void {
+ if (std.debug.runtime_safety) {
+ held.mutex.lock = false;
+ }
+ }
+ };
+
+ /// Try to acquire the mutex without blocking. Returns null if
+ /// the mutex is unavailable. Otherwise returns Held. Call
+ /// release on Held.
+ pub fn tryAcquire(m: *Dummy) ?Held {
+ if (std.debug.runtime_safety) {
+ if (m.lock) return null;
+ m.lock = true;
+ }
+ return Held{ .mutex = m };
+ }
+
+ /// Acquire the mutex. Will deadlock if the mutex is already
+ /// held by the calling thread.
+ pub fn acquire(m: *Dummy) Held {
+ return m.tryAcquire() orelse @panic("deadlock detected");
+ }
+};
+
+const WindowsMutex = struct {
+ srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT,
+
+ pub const Held = struct {
+ mutex: *WindowsMutex,
+
+ pub fn release(held: Held) void {
+ windows.kernel32.ReleaseSRWLockExclusive(&held.mutex.srwlock);
+ }
+ };
+
+ pub fn tryAcquire(m: *WindowsMutex) ?Held {
+ if (windows.kernel32.TryAcquireSRWLockExclusive(&m.srwlock) != windows.FALSE) {
+ return Held{ .mutex = m };
+ } else {
+ return null;
+ }
+ }
+
+ pub fn acquire(m: *WindowsMutex) Held {
+ windows.kernel32.AcquireSRWLockExclusive(&m.srwlock);
+ return Held{ .mutex = m };
+ }
+};
+
+const TestContext = struct {
+ mutex: *Mutex,
+ data: i128,
+
+ const incr_count = 10000;
+};
+
+test "basic usage" {
+ var mutex = Mutex{};
+
+ var context = TestContext{
+ .mutex = &mutex,
+ .data = 0,
+ };
+
+ if (builtin.single_threaded) {
+ worker(&context);
+ testing.expect(context.data == TestContext.incr_count);
+ } else {
+ const thread_count = 10;
+ var threads: [thread_count]*std.Thread = undefined;
+ for (threads) |*t| {
+ t.* = try std.Thread.spawn(&context, worker);
+ }
+ for (threads) |t|
+ t.wait();
+
+ testing.expect(context.data == thread_count * TestContext.incr_count);
+ }
+}
+
+fn worker(ctx: *TestContext) void {
+ var i: usize = 0;
+ while (i != TestContext.incr_count) : (i += 1) {
+ const held = ctx.mutex.acquire();
+ defer held.release();
+
+ ctx.data += 1;
+ }
+}
diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig
new file mode 100644
index 0000000000..622f9be98e
--- /dev/null
+++ b/lib/std/Thread/ResetEvent.zig
@@ -0,0 +1,297 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A thread-safe resource which supports blocking until signaled.
+//! This API is for kernel threads, not evented I/O.
+//! This API requires being initialized at runtime, and initialization
+//! can fail. Once initialized, the core operations cannot fail.
+//! If you need an abstraction that cannot fail to be initialized, see
+//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure,
+//! it is preferred to use `ResetEvent`.
+
+const ResetEvent = @This();
+const std = @import("../std.zig");
+const builtin = std.builtin;
+const testing = std.testing;
+const assert = std.debug.assert;
+const c = std.c;
+const os = std.os;
+const time = std.time;
+
+impl: Impl,
+
+pub const Impl = if (builtin.single_threaded)
+ std.Thread.StaticResetEvent.DebugEvent
+else if (std.Target.current.isDarwin())
+ DarwinEvent
+else if (std.Thread.use_pthreads)
+ PosixEvent
+else
+ std.Thread.StaticResetEvent.AtomicEvent;
+
+pub const InitError = error{SystemResources};
+
+/// After `init`, it is legal to call any other function.
+pub fn init(ev: *ResetEvent) InitError!void {
+ return ev.impl.init();
+}
+
+/// This function is not thread-safe.
+/// After `deinit`, the only legal function to call is `init`.
+pub fn deinit(ev: *ResetEvent) void {
+ return ev.impl.deinit();
+}
+
+/// Sets the event if not already set and wakes up all the threads waiting on
+/// the event. It is safe to call `set` multiple times before calling `wait`.
+/// However it is illegal to call `set` after `wait` is called until the event
+/// is `reset`. This function is thread-safe.
+pub fn set(ev: *ResetEvent) void {
+ return ev.impl.set();
+}
+
+/// Resets the event to its original, unset state.
+/// This function is *not* thread-safe. It is equivalent to calling
+/// `deinit` followed by `init` but without the possibility of failure.
+pub fn reset(ev: *ResetEvent) void {
+ return ev.impl.reset();
+}
+
+/// Wait for the event to be set by blocking the current thread.
+/// Thread-safe. No spurious wakeups.
+/// Upon return from `wait`, the only functions available to be called
+/// in `ResetEvent` are `reset` and `deinit`.
+pub fn wait(ev: *ResetEvent) void {
+ return ev.impl.wait();
+}
+
+pub const TimedWaitResult = enum { event_set, timed_out };
+
+/// Wait for the event to be set by blocking the current thread.
+/// A timeout in nanoseconds can be provided as a hint for how
+/// long the thread should block on the unset event before returning
+/// `TimedWaitResult.timed_out`.
+/// Thread-safe. No precision of timing is guaranteed.
+/// Upon return from `wait`, the only functions available to be called
+/// in `ResetEvent` are `reset` and `deinit`.
+pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult {
+ return ev.impl.timedWait(timeout_ns);
+}
+
+/// Apple has decided to not support POSIX semaphores, so we go with a
+/// different approach using Grand Central Dispatch. This API is exposed
+/// by libSystem so it is guaranteed to be available on all Darwin platforms.
+pub const DarwinEvent = struct {
+ sem: c.dispatch_semaphore_t = undefined,
+
+ pub fn init(ev: *DarwinEvent) !void {
+ ev.* = .{
+ .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources,
+ };
+ }
+
+ pub fn deinit(ev: *DarwinEvent) void {
+ c.dispatch_release(ev.sem);
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *DarwinEvent) void {
+ // Empirically this returns the numerical value of the semaphore.
+ _ = c.dispatch_semaphore_signal(ev.sem);
+ }
+
+ pub fn wait(ev: *DarwinEvent) void {
+ assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0);
+ }
+
+ pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult {
+ const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns));
+ if (c.dispatch_semaphore_wait(ev.sem, t) != 0) {
+ return .timed_out;
+ } else {
+ return .event_set;
+ }
+ }
+
+ pub fn reset(ev: *DarwinEvent) void {
+ // Keep calling until the semaphore goes back down to 0.
+ while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {}
+ }
+};
+
+/// POSIX semaphores must be initialized at runtime because they are allowed to
+/// be implemented as file descriptors, in which case initialization would require
+/// a syscall to open the fd.
+pub const PosixEvent = struct {
+ sem: c.sem_t = undefined,
+
+ pub fn init(ev: *PosixEvent) !void {
+ switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) {
+ 0 => return,
+ else => return error.SystemResources,
+ }
+ }
+
+ pub fn deinit(ev: *PosixEvent) void {
+ assert(c.sem_destroy(&ev.sem) == 0);
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *PosixEvent) void {
+ assert(c.sem_post(&ev.sem) == 0);
+ }
+
+ pub fn wait(ev: *PosixEvent) void {
+ while (true) {
+ switch (c.getErrno(c.sem_wait(&ev.sem))) {
+ 0 => return,
+ c.EINTR => continue,
+ c.EINVAL => unreachable,
+ else => unreachable,
+ }
+ }
+ }
+
+ pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult {
+ var ts: os.timespec = undefined;
+ var timeout_abs = timeout_ns;
+ os.clock_gettime(os.CLOCK_REALTIME, &ts) catch return .timed_out;
+ timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s;
+ timeout_abs += @intCast(u64, ts.tv_nsec);
+ ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s));
+ ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s));
+ while (true) {
+ switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) {
+ 0 => return .event_set,
+ c.EINTR => continue,
+ c.EINVAL => unreachable,
+ c.ETIMEDOUT => return .timed_out,
+ else => unreachable,
+ }
+ }
+ }
+
+ pub fn reset(ev: *PosixEvent) void {
+ while (true) {
+ switch (c.getErrno(c.sem_trywait(&ev.sem))) {
+ 0 => continue, // Need to make it go to zero.
+ c.EINTR => continue,
+ c.EINVAL => unreachable,
+ c.EAGAIN => return, // The semaphore currently has the value zero.
+ else => unreachable,
+ }
+ }
+ }
+};
+
+test "basic usage" {
+ var event: ResetEvent = undefined;
+ try event.init();
+ defer event.deinit();
+
+ // test event setting
+ event.set();
+
+ // test event resetting
+ event.reset();
+
+ // test event waiting (non-blocking)
+ event.set();
+ event.wait();
+ event.reset();
+
+ event.set();
+ testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
+
+ // test cross-thread signaling
+ if (builtin.single_threaded)
+ return;
+
+ const Context = struct {
+ const Self = @This();
+
+ value: u128,
+ in: ResetEvent,
+ out: ResetEvent,
+
+ fn init(self: *Self) !void {
+ self.* = .{
+ .value = 0,
+ .in = undefined,
+ .out = undefined,
+ };
+ try self.in.init();
+ try self.out.init();
+ }
+
+ fn deinit(self: *Self) void {
+ self.in.deinit();
+ self.out.deinit();
+ self.* = undefined;
+ }
+
+ fn sender(self: *Self) void {
+ // update value and signal input
+ testing.expect(self.value == 0);
+ self.value = 1;
+ self.in.set();
+
+ // wait for receiver to update value and signal output
+ self.out.wait();
+ testing.expect(self.value == 2);
+
+ // update value and signal final input
+ self.value = 3;
+ self.in.set();
+ }
+
+ fn receiver(self: *Self) void {
+ // wait for sender to update value and signal input
+ self.in.wait();
+ assert(self.value == 1);
+
+ // update value and signal output
+ self.in.reset();
+ self.value = 2;
+ self.out.set();
+
+ // wait for sender to update value and signal final input
+ self.in.wait();
+ assert(self.value == 3);
+ }
+
+ fn sleeper(self: *Self) void {
+ self.in.set();
+ time.sleep(time.ns_per_ms * 2);
+ self.value = 5;
+ self.out.set();
+ }
+
+ fn timedWaiter(self: *Self) !void {
+ self.in.wait();
+ testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
+ try self.out.timedWait(time.ns_per_ms * 100);
+ testing.expect(self.value == 5);
+ }
+ };
+
+ var context: Context = undefined;
+ try context.init();
+ defer context.deinit();
+ const receiver = try std.Thread.spawn(&context, Context.receiver);
+ defer receiver.wait();
+ context.sender();
+
+ if (false) {
+ // I have now observed this fail on macOS, Windows, and Linux.
+ // https://github.com/ziglang/zig/issues/7009
+ var timed = Context.init();
+ defer timed.deinit();
+ const sleeper = try std.Thread.spawn(&timed, Context.sleeper);
+ defer sleeper.wait();
+ try timed.timedWaiter();
+ }
+}
diff --git a/lib/std/Thread/RwLock.zig b/lib/std/Thread/RwLock.zig
new file mode 100644
index 0000000000..1d606a9cf1
--- /dev/null
+++ b/lib/std/Thread/RwLock.zig
@@ -0,0 +1,308 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A lock that supports one writer or many readers.
+//! This API is for kernel threads, not evented I/O.
+//! This API requires being initialized at runtime, and initialization
+//! can fail. Once initialized, the core operations cannot fail.
+
+impl: Impl,
+
+const RwLock = @This();
+const std = @import("../std.zig");
+const builtin = std.builtin;
+const assert = std.debug.assert;
+const Mutex = std.Thread.Mutex;
+const Semaphore = std.Semaphore;
+const CondVar = std.CondVar;
+
+pub const Impl = if (builtin.single_threaded)
+ SingleThreadedRwLock
+else if (std.Thread.use_pthreads)
+ PthreadRwLock
+else
+ DefaultRwLock;
+
+pub fn init(rwl: *RwLock) void {
+ return rwl.impl.init();
+}
+
+pub fn deinit(rwl: *RwLock) void {
+ return rwl.impl.deinit();
+}
+
+/// Attempts to obtain exclusive lock ownership.
+/// Returns `true` if the lock is obtained, `false` otherwise.
+pub fn tryLock(rwl: *RwLock) bool {
+ return rwl.impl.tryLock();
+}
+
+/// Blocks until exclusive lock ownership is acquired.
+pub fn lock(rwl: *RwLock) void {
+ return rwl.impl.lock();
+}
+
+/// Releases a held exclusive lock.
+/// Asserts the lock is held exclusively.
+pub fn unlock(rwl: *RwLock) void {
+ return rwl.impl.unlock();
+}
+
+/// Attempts to obtain shared lock ownership.
+/// Returns `true` if the lock is obtained, `false` otherwise.
+pub fn tryLockShared(rwl: *RwLock) bool {
+ return rwl.impl.tryLockShared();
+}
+
+/// Blocks until shared lock ownership is acquired.
+pub fn lockShared(rwl: *RwLock) void {
+ return rwl.impl.lockShared();
+}
+
+/// Releases a held shared lock.
+pub fn unlockShared(rwl: *RwLock) void {
+ return rwl.impl.unlockShared();
+}
+
+/// Single-threaded applications use this for deadlock checks in
+/// debug mode, and no-ops in release modes.
+pub const SingleThreadedRwLock = struct {
+ state: enum { unlocked, locked_exclusive, locked_shared },
+ shared_count: usize,
+
+ pub fn init(rwl: *SingleThreadedRwLock) void {
+ rwl.* = .{
+ .state = .unlocked,
+ .shared_count = 0,
+ };
+ }
+
+ pub fn deinit(rwl: *SingleThreadedRwLock) void {
+ assert(rwl.state == .unlocked);
+ assert(rwl.shared_count == 0);
+ }
+
+ /// Attempts to obtain exclusive lock ownership.
+ /// Returns `true` if the lock is obtained, `false` otherwise.
+ pub fn tryLock(rwl: *SingleThreadedRwLock) bool {
+ switch (rwl.state) {
+ .unlocked => {
+ assert(rwl.shared_count == 0);
+ rwl.state = .locked_exclusive;
+ return true;
+ },
+ .locked_exclusive, .locked_shared => return false,
+ }
+ }
+
+ /// Blocks until exclusive lock ownership is acquired.
+ pub fn lock(rwl: *SingleThreadedRwLock) void {
+ assert(rwl.state == .unlocked); // deadlock detected
+ assert(rwl.shared_count == 0); // corrupted state detected
+ rwl.state = .locked_exclusive;
+ }
+
+ /// Releases a held exclusive lock.
+ /// Asserts the lock is held exclusively.
+ pub fn unlock(rwl: *SingleThreadedRwLock) void {
+ assert(rwl.state == .locked_exclusive);
+ assert(rwl.shared_count == 0); // corrupted state detected
+ rwl.state = .unlocked;
+ }
+
+ /// Attempts to obtain shared lock ownership.
+ /// Returns `true` if the lock is obtained, `false` otherwise.
+ pub fn tryLockShared(rwl: *SingleThreadedRwLock) bool {
+ switch (rwl.state) {
+ .unlocked => {
+ rwl.state = .locked_shared;
+ assert(rwl.shared_count == 0);
+ rwl.shared_count = 1;
+ return true;
+ },
+ .locked_exclusive, .locked_shared => return false,
+ }
+ }
+
+ /// Blocks until shared lock ownership is acquired.
+ pub fn lockShared(rwl: *SingleThreadedRwLock) void {
+ switch (rwl.state) {
+ .unlocked => {
+ rwl.state = .locked_shared;
+ assert(rwl.shared_count == 0);
+ rwl.shared_count = 1;
+ },
+ .locked_shared => {
+ rwl.shared_count += 1;
+ },
+ .locked_exclusive => unreachable, // deadlock detected
+ }
+ }
+
+ /// Releases a held shared lock.
+ pub fn unlockShared(rwl: *SingleThreadedRwLock) void {
+ switch (rwl.state) {
+ .unlocked => unreachable, // too many calls to `unlockShared`
+ .locked_exclusive => unreachable, // exclusively held lock
+ .locked_shared => {
+ rwl.shared_count -= 1;
+ if (rwl.shared_count == 0) {
+ rwl.state = .unlocked;
+ }
+ },
+ }
+ }
+};
+
+pub const PthreadRwLock = struct {
+ rwlock: pthread_rwlock_t,
+
+ pub fn init(rwl: *PthreadRwLock) void {
+ rwl.* = .{ .rwlock = .{} };
+ }
+
+ pub fn deinit(rwl: *PthreadRwLock) void {
+ const safe_rc = switch (std.builtin.os.tag) {
+ .dragonfly, .netbsd => std.os.EAGAIN,
+ else => 0,
+ };
+
+ const rc = std.c.pthread_rwlock_destroy(&rwl.rwlock);
+ assert(rc == 0 or rc == safe_rc);
+
+ rwl.* = undefined;
+ }
+
+ pub fn tryLock(rwl: *PthreadRwLock) bool {
+ return pthread_rwlock_trywrlock(&rwl.rwlock) == 0;
+ }
+
+ pub fn lock(rwl: *PthreadRwLock) void {
+ const rc = pthread_rwlock_wrlock(&rwl.rwlock);
+ assert(rc == 0);
+ }
+
+ pub fn unlock(rwl: *PthreadRwLock) void {
+ const rc = pthread_rwlock_unlock(&rwl.rwlock);
+ assert(rc == 0);
+ }
+
+ pub fn tryLockShared(rwl: *PthreadRwLock) bool {
+ return pthread_rwlock_tryrdlock(&rwl.rwlock) == 0;
+ }
+
+ pub fn lockShared(rwl: *PthreadRwLock) void {
+ const rc = pthread_rwlock_rdlock(&rwl.rwlock);
+ assert(rc == 0);
+ }
+
+ pub fn unlockShared(rwl: *PthreadRwLock) void {
+ const rc = pthread_rwlock_unlock(&rwl.rwlock);
+ assert(rc == 0);
+ }
+};
+
+pub const DefaultRwLock = struct {
+ state: usize,
+ mutex: Mutex,
+ semaphore: Semaphore,
+
+ const IS_WRITING: usize = 1;
+ const WRITER: usize = 1 << 1;
+ const READER: usize = 1 << (1 + std.meta.bitCount(Count));
+ const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, WRITER);
+ const READER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, READER);
+ const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2));
+
+ pub fn init(rwl: *DefaultRwLock) void {
+ rwl.* = .{
+ .state = 0,
+ .mutex = Mutex.init(),
+ .semaphore = Semaphore.init(0),
+ };
+ }
+
+ pub fn deinit(rwl: *DefaultRwLock) void {
+ rwl.semaphore.deinit();
+ rwl.mutex.deinit();
+ rwl.* = undefined;
+ }
+
+ pub fn tryLock(rwl: *DefaultRwLock) bool {
+ if (rwl.mutex.tryLock()) {
+ const state = @atomicLoad(usize, &rwl.state, .SeqCst);
+ if (state & READER_MASK == 0) {
+ _ = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst);
+ return true;
+ }
+
+ rwl.mutex.unlock();
+ }
+
+ return false;
+ }
+
+ pub fn lock(rwl: *DefaultRwLock) void {
+ _ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .SeqCst);
+ rwl.mutex.lock();
+
+ const state = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst);
+ if (state & READER_MASK != 0)
+ rwl.semaphore.wait();
+ }
+
+ pub fn unlock(rwl: *DefaultRwLock) void {
+ _ = @atomicRmw(usize, &rwl.state, .And, ~IS_WRITING, .SeqCst);
+ rwl.mutex.unlock();
+ }
+
+ pub fn tryLockShared(rwl: *DefaultRwLock) bool {
+ const state = @atomicLoad(usize, &rwl.state, .SeqCst);
+ if (state & (IS_WRITING | WRITER_MASK) == 0) {
+ _ = @cmpxchgStrong(
+ usize,
+ &rwl.state,
+ state,
+ state + READER,
+ .SeqCst,
+ .SeqCst,
+ ) orelse return true;
+ }
+
+ if (rwl.mutex.tryLock()) {
+ _ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst);
+ rwl.mutex.unlock();
+ return true;
+ }
+
+ return false;
+ }
+
+ pub fn lockShared(rwl: *DefaultRwLock) void {
+ var state = @atomicLoad(usize, &rwl.state, .SeqCst);
+ while (state & (IS_WRITING | WRITER_MASK) == 0) {
+ state = @cmpxchgWeak(
+ usize,
+ &rwl.state,
+ state,
+ state + READER,
+ .SeqCst,
+ .SeqCst,
+ ) orelse return;
+ }
+
+ rwl.mutex.lock();
+ _ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst);
+ rwl.mutex.unlock();
+ }
+
+ pub fn unlockShared(rwl: *DefaultRwLock) void {
+ const state = @atomicRmw(usize, &rwl.state, .Sub, READER, .SeqCst);
+
+ if ((state & READER_MASK == READER) and (state & IS_WRITING != 0))
+ rwl.semaphore.post();
+ }
+};
diff --git a/lib/std/Thread/Semaphore.zig b/lib/std/Thread/Semaphore.zig
new file mode 100644
index 0000000000..169975b362
--- /dev/null
+++ b/lib/std/Thread/Semaphore.zig
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A semaphore is an unsigned integer that blocks the kernel thread if
+//! the number would become negative.
+//! This API supports static initialization and does not require deinitialization.
+
+mutex: Mutex = .{},
+cond: Condition = .{},
+/// It is OK to initialize this field to any value.
+permits: usize = 0,
+
+const Semaphore = @This();
+const std = @import("../std.zig");
+const Mutex = std.Thread.Mutex;
+const Condition = std.Thread.Condition;
+
+pub fn wait(sem: *Semaphore) void {
+ const held = sem.mutex.acquire();
+ defer held.release();
+
+ while (sem.permits == 0)
+ sem.cond.wait(&sem.mutex);
+
+ sem.permits -= 1;
+ if (sem.permits > 0)
+ sem.cond.signal();
+}
+
+pub fn post(sem: *Semaphore) void {
+ const held = sem.mutex.acquire();
+ defer held.release();
+
+ sem.permits += 1;
+ sem.cond.signal();
+}
diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig
new file mode 100644
index 0000000000..6d90d7cf9a
--- /dev/null
+++ b/lib/std/Thread/StaticResetEvent.zig
@@ -0,0 +1,395 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A thread-safe resource which supports blocking until signaled.
+//! This API is for kernel threads, not evented I/O.
+//! This API is statically initializable. It cannot fail to be initialized
+//! and it requires no deinitialization. The downside is that it may not
+//! integrate as cleanly into other synchronization APIs, or, in a worst case,
+//! may be forced to fall back on spin locking. As a rule of thumb, prefer
+//! to use `std.Thread.ResetEvent` when possible, and use `StaticResetEvent` when
+//! the logic needs stronger API guarantees.
+
+const std = @import("../std.zig");
+const StaticResetEvent = @This();
+const assert = std.debug.assert;
+const os = std.os;
+const time = std.time;
+const linux = std.os.linux;
+const windows = std.os.windows;
+const testing = std.testing;
+
+impl: Impl = .{},
+
+pub const Impl = if (std.builtin.single_threaded)
+ DebugEvent
+else
+ AtomicEvent;
+
+/// Sets the event if not already set and wakes up all the threads waiting on
+/// the event. It is safe to call `set` multiple times before calling `wait`.
+/// However it is illegal to call `set` after `wait` is called until the event
+/// is `reset`. This function is thread-safe.
+pub fn set(ev: *StaticResetEvent) void {
+ return ev.impl.set();
+}
+
+/// Wait for the event to be set by blocking the current thread.
+/// Thread-safe. No spurious wakeups.
+/// Upon return from `wait`, the only function available to be called
+/// in `StaticResetEvent` is `reset`.
+pub fn wait(ev: *StaticResetEvent) void {
+ return ev.impl.wait();
+}
+
+/// Resets the event to its original, unset state.
+/// This function is *not* thread-safe. It is equivalent to calling
+/// `deinit` followed by `init` but without the possibility of failure.
+pub fn reset(ev: *StaticResetEvent) void {
+ return ev.impl.reset();
+}
+
+pub const TimedWaitResult = std.Thread.ResetEvent.TimedWaitResult;
+
+/// Wait for the event to be set by blocking the current thread.
+/// A timeout in nanoseconds can be provided as a hint for how
+/// long the thread should block on the unset event before returning
+/// `TimedWaitResult.timed_out`.
+/// Thread-safe. No precision of timing is guaranteed.
+/// Upon return from `timedWait`, the only function available to be called
+/// in `StaticResetEvent` is `reset`.
+pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult {
+ return ev.impl.timedWait(timeout_ns);
+}
+
+/// For single-threaded builds, we use this to detect deadlocks.
+/// In unsafe modes this ends up being no-ops.
+pub const DebugEvent = struct {
+ state: State = State.unset,
+
+ const State = enum {
+ unset,
+ set,
+ waited,
+ };
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.Thread.ResetEvent`.
+ pub fn init(ev: *DebugEvent) void {
+ ev.* = .{};
+ }
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.Thread.ResetEvent`.
+ pub fn deinit(ev: *DebugEvent) void {
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *DebugEvent) void {
+ switch (ev.state) {
+ .unset => ev.state = .set,
+ .set => {},
+ .waited => unreachable, // Not allowed to call `set` until `reset`.
+ }
+ }
+
+ pub fn wait(ev: *DebugEvent) void {
+ switch (ev.state) {
+ .unset => unreachable, // Deadlock detected.
+ .set => return,
+ .waited => unreachable, // Not allowed to call `wait` until `reset`.
+ }
+ }
+
+ pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult {
+ switch (ev.state) {
+ .unset => return .timed_out,
+ .set => return .event_set,
+ .waited => unreachable, // Not allowed to call `wait` until `reset`.
+ }
+ }
+
+ pub fn reset(ev: *DebugEvent) void {
+ ev.state = .unset;
+ }
+};
+
+pub const AtomicEvent = struct {
+ waiters: u32 = 0,
+
+ const WAKE = 1 << 0;
+ const WAIT = 1 << 1;
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.Thread.ResetEvent`.
+ pub fn init(ev: *AtomicEvent) void {
+ ev.* = .{};
+ }
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.Thread.ResetEvent`.
+ pub fn deinit(ev: *AtomicEvent) void {
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *AtomicEvent) void {
+ const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release);
+ if (waiters >= WAIT) {
+ return Futex.wake(&ev.waiters, waiters >> 1);
+ }
+ }
+
+ pub fn wait(ev: *AtomicEvent) void {
+ switch (ev.timedWait(null)) {
+ .timed_out => unreachable,
+ .event_set => return,
+ }
+ }
+
+ pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult {
+ var waiters = @atomicLoad(u32, &ev.waiters, .Acquire);
+ while (waiters != WAKE) {
+ waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse {
+ if (Futex.wait(&ev.waiters, timeout)) |_| {
+ return .event_set;
+ } else |_| {
+ return .timed_out;
+ }
+ };
+ }
+ return .event_set;
+ }
+
+ pub fn reset(ev: *AtomicEvent) void {
+ @atomicStore(u32, &ev.waiters, 0, .Monotonic);
+ }
+
+ pub const Futex = switch (std.Target.current.os.tag) {
+ .windows => WindowsFutex,
+ .linux => LinuxFutex,
+ else => SpinFutex,
+ };
+
+ pub const SpinFutex = struct {
+ fn wake(waiters: *u32, wake_count: u32) void {}
+
+ fn wait(waiters: *u32, timeout: ?u64) !void {
+ var timer: time.Timer = undefined;
+ if (timeout != null)
+ timer = time.Timer.start() catch return error.TimedOut;
+
+ while (@atomicLoad(u32, waiters, .Acquire) != WAKE) {
+ std.os.sched_yield() catch std.Thread.spinLoopHint();
+ if (timeout) |timeout_ns| {
+ if (timer.read() >= timeout_ns)
+ return error.TimedOut;
+ }
+ }
+ }
+ };
+
+ pub const LinuxFutex = struct {
+ fn wake(waiters: *u32, wake_count: u32) void {
+ const waiting = std.math.maxInt(i32); // wake_count
+ const ptr = @ptrCast(*const i32, waiters);
+ const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting);
+ assert(linux.getErrno(rc) == 0);
+ }
+
+ fn wait(waiters: *u32, timeout: ?u64) !void {
+ var ts: linux.timespec = undefined;
+ var ts_ptr: ?*linux.timespec = null;
+ if (timeout) |timeout_ns| {
+ ts_ptr = &ts;
+ ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s);
+ ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s);
+ }
+
+ while (true) {
+ const waiting = @atomicLoad(u32, waiters, .Acquire);
+ if (waiting == WAKE)
+ return;
+ const expected = @intCast(i32, waiting);
+ const ptr = @ptrCast(*const i32, waiters);
+ const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr);
+ switch (linux.getErrno(rc)) {
+ 0 => continue,
+ os.ETIMEDOUT => return error.TimedOut,
+ os.EINTR => continue,
+ os.EAGAIN => return,
+ else => unreachable,
+ }
+ }
+ }
+ };
+
+ pub const WindowsFutex = struct {
+ pub fn wake(waiters: *u32, wake_count: u32) void {
+ const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count);
+ const key = @ptrCast(*const c_void, waiters);
+
+ var waiting = wake_count;
+ while (waiting != 0) : (waiting -= 1) {
+ const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
+ assert(rc == .SUCCESS);
+ }
+ }
+
+ pub fn wait(waiters: *u32, timeout: ?u64) !void {
+ const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout);
+ const key = @ptrCast(*const c_void, waiters);
+
+ // NT uses timeouts in units of 100ns with negative value being relative
+ var timeout_ptr: ?*windows.LARGE_INTEGER = null;
+ var timeout_value: windows.LARGE_INTEGER = undefined;
+ if (timeout) |timeout_ns| {
+ timeout_ptr = &timeout_value;
+ timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100);
+ }
+
+ // NtWaitForKeyedEvent doesnt have spurious wake-ups
+ var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr);
+ switch (rc) {
+ .TIMEOUT => {
+ // update the wait count to signal that we're not waiting anymore.
+ // if the .set() thread already observed that we are, perform a
+ // matching NtWaitForKeyedEvent so that the .set() thread doesn't
+ // deadlock trying to run NtReleaseKeyedEvent above.
+ var waiting = @atomicLoad(u32, waiters, .Monotonic);
+ while (true) {
+ if (waiting == WAKE) {
+ rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
+ assert(rc == .WAIT_0);
+ break;
+ } else {
+ waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break;
+ continue;
+ }
+ }
+ return error.TimedOut;
+ },
+ .WAIT_0 => {},
+ else => unreachable,
+ }
+ }
+
+ var event_handle: usize = EMPTY;
+ const EMPTY = ~@as(usize, 0);
+ const LOADING = EMPTY - 1;
+
+ pub fn getEventHandle() ?windows.HANDLE {
+ var handle = @atomicLoad(usize, &event_handle, .Monotonic);
+ while (true) {
+ switch (handle) {
+ EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse {
+ const handle_ptr = @ptrCast(*windows.HANDLE, &handle);
+ const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE;
+ if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS)
+ handle = 0;
+ @atomicStore(usize, &event_handle, handle, .Monotonic);
+ return @intToPtr(?windows.HANDLE, handle);
+ },
+ LOADING => {
+ std.os.sched_yield() catch std.Thread.spinLoopHint();
+ handle = @atomicLoad(usize, &event_handle, .Monotonic);
+ },
+ else => {
+ return @intToPtr(?windows.HANDLE, handle);
+ },
+ }
+ }
+ }
+ };
+};
+
+test "basic usage" {
+ var event = StaticResetEvent{};
+
+ // test event setting
+ event.set();
+
+ // test event resetting
+ event.reset();
+
+ // test event waiting (non-blocking)
+ event.set();
+ event.wait();
+ event.reset();
+
+ event.set();
+ testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
+
+ // test cross-thread signaling
+ if (std.builtin.single_threaded)
+ return;
+
+ const Context = struct {
+ const Self = @This();
+
+ value: u128 = 0,
+ in: StaticResetEvent = .{},
+ out: StaticResetEvent = .{},
+
+ fn sender(self: *Self) void {
+ // update value and signal input
+ testing.expect(self.value == 0);
+ self.value = 1;
+ self.in.set();
+
+ // wait for receiver to update value and signal output
+ self.out.wait();
+ testing.expect(self.value == 2);
+
+ // update value and signal final input
+ self.value = 3;
+ self.in.set();
+ }
+
+ fn receiver(self: *Self) void {
+ // wait for sender to update value and signal input
+ self.in.wait();
+ assert(self.value == 1);
+
+ // update value and signal output
+ self.in.reset();
+ self.value = 2;
+ self.out.set();
+
+ // wait for sender to update value and signal final input
+ self.in.wait();
+ assert(self.value == 3);
+ }
+
+ fn sleeper(self: *Self) void {
+ self.in.set();
+ time.sleep(time.ns_per_ms * 2);
+ self.value = 5;
+ self.out.set();
+ }
+
+ fn timedWaiter(self: *Self) !void {
+ self.in.wait();
+ testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
+ try self.out.timedWait(time.ns_per_ms * 100);
+ testing.expect(self.value == 5);
+ }
+ };
+
+ var context = Context{};
+ const receiver = try std.Thread.spawn(&context, Context.receiver);
+ defer receiver.wait();
+ context.sender();
+
+ if (false) {
+ // I have now observed this fail on macOS, Windows, and Linux.
+ // https://github.com/ziglang/zig/issues/7009
+ var timed = Context.init();
+ defer timed.deinit();
+ const sleeper = try std.Thread.spawn(&timed, Context.sleeper);
+ defer sleeper.wait();
+ try timed.timedWaiter();
+ }
+}