aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread/StaticResetEvent.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2021-01-15 12:31:17 -0800
committerGitHub <noreply@github.com>2021-01-15 12:31:17 -0800
commit35686262f503372bb9f38a71b891fd19b30adecd (patch)
tree5e9d3af75fc0c3d89ee001e2c922946fabb7d6ac /lib/std/Thread/StaticResetEvent.zig
parent2faf8c53d226cec1445156cafdec59af5e909fb2 (diff)
parent19f893c6bbb3f5ab4e8d943b0ac2fefa94e5d666 (diff)
downloadzig-35686262f503372bb9f38a71b891fd19b30adecd.tar.gz
zig-35686262f503372bb9f38a71b891fd19b30adecd.zip
Merge pull request #7785 from ziglang/std-thread-primitives
organize std lib concurrency primitives and add RwLock
Diffstat (limited to 'lib/std/Thread/StaticResetEvent.zig')
-rw-r--r--lib/std/Thread/StaticResetEvent.zig395
1 files changed, 395 insertions, 0 deletions
diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig
new file mode 100644
index 0000000000..6d90d7cf9a
--- /dev/null
+++ b/lib/std/Thread/StaticResetEvent.zig
@@ -0,0 +1,395 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! A thread-safe resource which supports blocking until signaled.
+//! This API is for kernel threads, not evented I/O.
+//! This API is statically initializable. It cannot fail to be initialized
+//! and it requires no deinitialization. The downside is that it may not
+//! integrate as cleanly into other synchronization APIs, or, in a worst case,
+//! may be forced to fall back on spin locking. As a rule of thumb, prefer
+//! to use `std.Thread.ResetEvent` when possible, and use `StaticResetEvent` when
+//! the logic needs stronger API guarantees.
+
+const std = @import("../std.zig");
+const StaticResetEvent = @This();
+const assert = std.debug.assert;
+const os = std.os;
+const time = std.time;
+const linux = std.os.linux;
+const windows = std.os.windows;
+const testing = std.testing;
+
+impl: Impl = .{},
+
+pub const Impl = if (std.builtin.single_threaded)
+ DebugEvent
+else
+ AtomicEvent;
+
+/// Sets the event if not already set and wakes up all the threads waiting on
+/// the event. It is safe to call `set` multiple times before calling `wait`.
+/// However it is illegal to call `set` after `wait` is called until the event
+/// is `reset`. This function is thread-safe.
+pub fn set(ev: *StaticResetEvent) void {
+ return ev.impl.set();
+}
+
+/// Wait for the event to be set by blocking the current thread.
+/// Thread-safe. No spurious wakeups.
+/// Upon return from `wait`, the only function available to be called
+/// in `StaticResetEvent` is `reset`.
+pub fn wait(ev: *StaticResetEvent) void {
+ return ev.impl.wait();
+}
+
+/// Resets the event to its original, unset state.
+/// This function is *not* thread-safe. It is equivalent to calling
+/// `deinit` followed by `init` but without the possibility of failure.
+pub fn reset(ev: *StaticResetEvent) void {
+ return ev.impl.reset();
+}
+
+pub const TimedWaitResult = std.Thread.ResetEvent.TimedWaitResult;
+
+/// Wait for the event to be set by blocking the current thread.
+/// A timeout in nanoseconds can be provided as a hint for how
+/// long the thread should block on the unset event before returning
+/// `TimedWaitResult.timed_out`.
+/// Thread-safe. No precision of timing is guaranteed.
+/// Upon return from `timedWait`, the only function available to be called
+/// in `StaticResetEvent` is `reset`.
+pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult {
+ return ev.impl.timedWait(timeout_ns);
+}
+
+/// For single-threaded builds, we use this to detect deadlocks.
+/// In unsafe modes this ends up being no-ops.
+pub const DebugEvent = struct {
+ state: State = State.unset,
+
+ const State = enum {
+ unset,
+ set,
+ waited,
+ };
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.Thread.ResetEvent`.
+ pub fn init(ev: *DebugEvent) void {
+ ev.* = .{};
+ }
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.Thread.ResetEvent`.
+ pub fn deinit(ev: *DebugEvent) void {
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *DebugEvent) void {
+ switch (ev.state) {
+ .unset => ev.state = .set,
+ .set => {},
+ .waited => unreachable, // Not allowed to call `set` until `reset`.
+ }
+ }
+
+ pub fn wait(ev: *DebugEvent) void {
+ switch (ev.state) {
+ .unset => unreachable, // Deadlock detected.
+ .set => return,
+ .waited => unreachable, // Not allowed to call `wait` until `reset`.
+ }
+ }
+
+ pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult {
+ switch (ev.state) {
+ .unset => return .timed_out,
+ .set => return .event_set,
+ .waited => unreachable, // Not allowed to call `wait` until `reset`.
+ }
+ }
+
+ pub fn reset(ev: *DebugEvent) void {
+ ev.state = .unset;
+ }
+};
+
+pub const AtomicEvent = struct {
+ waiters: u32 = 0,
+
+ const WAKE = 1 << 0;
+ const WAIT = 1 << 1;
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.Thread.ResetEvent`.
+ pub fn init(ev: *AtomicEvent) void {
+ ev.* = .{};
+ }
+
+ /// This function is provided so that this type can be re-used inside
+ /// `std.Thread.ResetEvent`.
+ pub fn deinit(ev: *AtomicEvent) void {
+ ev.* = undefined;
+ }
+
+ pub fn set(ev: *AtomicEvent) void {
+ const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release);
+ if (waiters >= WAIT) {
+ return Futex.wake(&ev.waiters, waiters >> 1);
+ }
+ }
+
+ pub fn wait(ev: *AtomicEvent) void {
+ switch (ev.timedWait(null)) {
+ .timed_out => unreachable,
+ .event_set => return,
+ }
+ }
+
+ pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult {
+ var waiters = @atomicLoad(u32, &ev.waiters, .Acquire);
+ while (waiters != WAKE) {
+ waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse {
+ if (Futex.wait(&ev.waiters, timeout)) |_| {
+ return .event_set;
+ } else |_| {
+ return .timed_out;
+ }
+ };
+ }
+ return .event_set;
+ }
+
+ pub fn reset(ev: *AtomicEvent) void {
+ @atomicStore(u32, &ev.waiters, 0, .Monotonic);
+ }
+
+ pub const Futex = switch (std.Target.current.os.tag) {
+ .windows => WindowsFutex,
+ .linux => LinuxFutex,
+ else => SpinFutex,
+ };
+
+ pub const SpinFutex = struct {
+ fn wake(waiters: *u32, wake_count: u32) void {}
+
+ fn wait(waiters: *u32, timeout: ?u64) !void {
+ var timer: time.Timer = undefined;
+ if (timeout != null)
+ timer = time.Timer.start() catch return error.TimedOut;
+
+ while (@atomicLoad(u32, waiters, .Acquire) != WAKE) {
+ std.os.sched_yield() catch std.Thread.spinLoopHint();
+ if (timeout) |timeout_ns| {
+ if (timer.read() >= timeout_ns)
+ return error.TimedOut;
+ }
+ }
+ }
+ };
+
+ pub const LinuxFutex = struct {
+ fn wake(waiters: *u32, wake_count: u32) void {
+ const waiting = std.math.maxInt(i32); // wake_count
+ const ptr = @ptrCast(*const i32, waiters);
+ const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting);
+ assert(linux.getErrno(rc) == 0);
+ }
+
+ fn wait(waiters: *u32, timeout: ?u64) !void {
+ var ts: linux.timespec = undefined;
+ var ts_ptr: ?*linux.timespec = null;
+ if (timeout) |timeout_ns| {
+ ts_ptr = &ts;
+ ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s);
+ ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s);
+ }
+
+ while (true) {
+ const waiting = @atomicLoad(u32, waiters, .Acquire);
+ if (waiting == WAKE)
+ return;
+ const expected = @intCast(i32, waiting);
+ const ptr = @ptrCast(*const i32, waiters);
+ const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr);
+ switch (linux.getErrno(rc)) {
+ 0 => continue,
+ os.ETIMEDOUT => return error.TimedOut,
+ os.EINTR => continue,
+ os.EAGAIN => return,
+ else => unreachable,
+ }
+ }
+ }
+ };
+
+ pub const WindowsFutex = struct {
+ pub fn wake(waiters: *u32, wake_count: u32) void {
+ const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count);
+ const key = @ptrCast(*const c_void, waiters);
+
+ var waiting = wake_count;
+ while (waiting != 0) : (waiting -= 1) {
+ const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
+ assert(rc == .SUCCESS);
+ }
+ }
+
+ pub fn wait(waiters: *u32, timeout: ?u64) !void {
+ const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout);
+ const key = @ptrCast(*const c_void, waiters);
+
+ // NT uses timeouts in units of 100ns with negative value being relative
+ var timeout_ptr: ?*windows.LARGE_INTEGER = null;
+ var timeout_value: windows.LARGE_INTEGER = undefined;
+ if (timeout) |timeout_ns| {
+ timeout_ptr = &timeout_value;
+ timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100);
+ }
+
+ // NtWaitForKeyedEvent doesnt have spurious wake-ups
+ var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr);
+ switch (rc) {
+ .TIMEOUT => {
+ // update the wait count to signal that we're not waiting anymore.
+ // if the .set() thread already observed that we are, perform a
+ // matching NtWaitForKeyedEvent so that the .set() thread doesn't
+ // deadlock trying to run NtReleaseKeyedEvent above.
+ var waiting = @atomicLoad(u32, waiters, .Monotonic);
+ while (true) {
+ if (waiting == WAKE) {
+ rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
+ assert(rc == .WAIT_0);
+ break;
+ } else {
+ waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break;
+ continue;
+ }
+ }
+ return error.TimedOut;
+ },
+ .WAIT_0 => {},
+ else => unreachable,
+ }
+ }
+
+ var event_handle: usize = EMPTY;
+ const EMPTY = ~@as(usize, 0);
+ const LOADING = EMPTY - 1;
+
+ pub fn getEventHandle() ?windows.HANDLE {
+ var handle = @atomicLoad(usize, &event_handle, .Monotonic);
+ while (true) {
+ switch (handle) {
+ EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse {
+ const handle_ptr = @ptrCast(*windows.HANDLE, &handle);
+ const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE;
+ if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS)
+ handle = 0;
+ @atomicStore(usize, &event_handle, handle, .Monotonic);
+ return @intToPtr(?windows.HANDLE, handle);
+ },
+ LOADING => {
+ std.os.sched_yield() catch std.Thread.spinLoopHint();
+ handle = @atomicLoad(usize, &event_handle, .Monotonic);
+ },
+ else => {
+ return @intToPtr(?windows.HANDLE, handle);
+ },
+ }
+ }
+ }
+ };
+};
+
+test "basic usage" {
+ var event = StaticResetEvent{};
+
+ // test event setting
+ event.set();
+
+ // test event resetting
+ event.reset();
+
+ // test event waiting (non-blocking)
+ event.set();
+ event.wait();
+ event.reset();
+
+ event.set();
+ testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
+
+ // test cross-thread signaling
+ if (std.builtin.single_threaded)
+ return;
+
+ const Context = struct {
+ const Self = @This();
+
+ value: u128 = 0,
+ in: StaticResetEvent = .{},
+ out: StaticResetEvent = .{},
+
+ fn sender(self: *Self) void {
+ // update value and signal input
+ testing.expect(self.value == 0);
+ self.value = 1;
+ self.in.set();
+
+ // wait for receiver to update value and signal output
+ self.out.wait();
+ testing.expect(self.value == 2);
+
+ // update value and signal final input
+ self.value = 3;
+ self.in.set();
+ }
+
+ fn receiver(self: *Self) void {
+ // wait for sender to update value and signal input
+ self.in.wait();
+ assert(self.value == 1);
+
+ // update value and signal output
+ self.in.reset();
+ self.value = 2;
+ self.out.set();
+
+ // wait for sender to update value and signal final input
+ self.in.wait();
+ assert(self.value == 3);
+ }
+
+ fn sleeper(self: *Self) void {
+ self.in.set();
+ time.sleep(time.ns_per_ms * 2);
+ self.value = 5;
+ self.out.set();
+ }
+
+ fn timedWaiter(self: *Self) !void {
+ self.in.wait();
+ testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
+ try self.out.timedWait(time.ns_per_ms * 100);
+ testing.expect(self.value == 5);
+ }
+ };
+
+ var context = Context{};
+ const receiver = try std.Thread.spawn(&context, Context.receiver);
+ defer receiver.wait();
+ context.sender();
+
+ if (false) {
+ // I have now observed this fail on macOS, Windows, and Linux.
+ // https://github.com/ziglang/zig/issues/7009
+ var timed = Context.init();
+ defer timed.deinit();
+ const sleeper = try std.Thread.spawn(&timed, Context.sleeper);
+ defer sleeper.wait();
+ try timed.timedWaiter();
+ }
+}