aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread/Condition.zig
diff options
context:
space:
mode:
Diffstat (limited to 'lib/std/Thread/Condition.zig')
-rw-r--r--lib/std/Thread/Condition.zig194
1 files changed, 194 insertions, 0 deletions
diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig
new file mode 100644
index 0000000000..a14b57f6b4
--- /dev/null
+++ b/lib/std/Thread/Condition.zig
@@ -0,0 +1,194 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A condition provides a way for a kernel thread to block until it is signaled
+//! to wake up. Spurious wakeups are possible.
+//! This API supports static initialization and does not require deinitialization.
+
+impl: Impl = .{},
+
+const std = @import("../std.zig");
+const Condition = @This();
+const windows = std.os.windows;
+const linux = std.os.linux;
+const Mutex = std.Thread.Mutex;
+const assert = std.debug.assert;
+
+pub fn wait(cond: *Condition, mutex: *Mutex) void {
+ cond.impl.wait(mutex);
+}
+
+pub fn signal(cond: *Condition) void {
+ cond.impl.signal();
+}
+
+pub fn broadcast(cond: *Condition) void {
+ cond.impl.broadcast();
+}
+
+const Impl = if (std.builtin.single_threaded)
+ SingleThreadedCondition
+else if (std.Target.current.os.tag == .windows)
+ WindowsCondition
+else if (std.Thread.use_pthreads)
+ PthreadCondition
+else
+ AtomicCondition;
+
+pub const SingleThreadedCondition = struct {
+ pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void {
+ unreachable; // deadlock detected
+ }
+
+ pub fn signal(cond: *SingleThreadedCondition) void {}
+
+ pub fn broadcast(cond: *SingleThreadedCondition) void {}
+};
+
+pub const WindowsCondition = struct {
+ cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT,
+
+ pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void {
+ const rc = windows.kernel32.SleepConditionVariableSRW(
+ &cond.cond,
+ &mutex.srwlock,
+ windows.INFINITE,
+ @as(windows.ULONG, 0),
+ );
+ assert(rc != windows.FALSE);
+ }
+
+ pub fn signal(cond: *WindowsCondition) void {
+ windows.kernel32.WakeConditionVariable(&cond.cond);
+ }
+
+ pub fn broadcast(cond: *WindowsCondition) void {
+ windows.kernel32.WakeAllConditionVariable(&cond.cond);
+ }
+};
+
+pub const PthreadCondition = struct {
+ cond: std.c.pthread_cond_t = .{},
+
+ pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void {
+ const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.impl.pthread_mutex);
+ assert(rc == 0);
+ }
+
+ pub fn signal(cond: *PthreadCondition) void {
+ const rc = std.c.pthread_cond_signal(&cond.cond);
+ assert(rc == 0);
+ }
+
+ pub fn broadcast(cond: *PthreadCondition) void {
+ const rc = std.c.pthread_cond_broadcast(&cond.cond);
+ assert(rc == 0);
+ }
+};
+
+pub const AtomicCondition = struct {
+ pending: bool = false,
+ queue_mutex: Mutex = .{},
+ queue_list: QueueList = .{},
+
+ pub const QueueList = std.SinglyLinkedList(QueueItem);
+
+ pub const QueueItem = struct {
+ futex: i32 = 0,
+
+ fn wait(cond: *@This()) void {
+ while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) {
+ switch (std.Target.current.os.tag) {
+ .linux => {
+ switch (linux.getErrno(linux.futex_wait(
+ &cond.futex,
+ linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT,
+ 0,
+ null,
+ ))) {
+ 0 => {},
+ std.os.EINTR => {},
+ std.os.EAGAIN => {},
+ else => unreachable,
+ }
+ },
+ else => spinLoopHint(),
+ }
+ }
+ }
+
+ fn notify(cond: *@This()) void {
+ @atomicStore(i32, &cond.futex, 1, .Release);
+
+ switch (std.Target.current.os.tag) {
+ .linux => {
+ switch (linux.getErrno(linux.futex_wake(
+ &cond.futex,
+ linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE,
+ 1,
+ ))) {
+ 0 => {},
+ std.os.EFAULT => {},
+ else => unreachable,
+ }
+ },
+ else => {},
+ }
+ }
+ };
+
+ pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void {
+ var waiter = QueueList.Node{ .data = .{} };
+
+ {
+ const held = cond.queue_mutex.acquire();
+ defer held.release();
+
+ cond.queue_list.prepend(&waiter);
+ @atomicStore(bool, &cond.pending, true, .SeqCst);
+ }
+
+ mutex.unlock();
+ waiter.data.wait();
+ mutex.lock();
+ }
+
+ pub fn signal(cond: *AtomicCondition) void {
+ if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
+ return;
+
+ const maybe_waiter = blk: {
+ const held = cond.queue_mutex.acquire();
+ defer held.release();
+
+ const maybe_waiter = cond.queue_list.popFirst();
+ @atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst);
+ break :blk maybe_waiter;
+ };
+
+ if (maybe_waiter) |waiter|
+ waiter.data.notify();
+ }
+
+ pub fn broadcast(cond: *AtomicCondition) void {
+ if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
+ return;
+
+ @atomicStore(bool, &cond.pending, false, .SeqCst);
+
+ var waiters = blk: {
+ const held = cond.queue_mutex.acquire();
+ defer held.release();
+
+ const waiters = cond.queue_list;
+ cond.queue_list = .{};
+ break :blk waiters;
+ };
+
+ while (waiters.popFirst()) |waiter|
+ waiter.data.notify();
+ }
+};