aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread/ResetEvent.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2022-07-01 15:52:54 -0700
committerAndrew Kelley <andrew@ziglang.org>2022-07-01 15:52:54 -0700
commitc89dd15e1be4959800dc7092d7dd4375253db7bc (patch)
treeca184ae53592efa21e67128a5f891d642d7f1118 /lib/std/Thread/ResetEvent.zig
parent5466e87fce581f2ef90ac23bb80b1dbc05836fc6 (diff)
parent2360f8c490f3ec684ed64ff28e8c1fade249070b (diff)
downloadzig-c89dd15e1be4959800dc7092d7dd4375253db7bc.tar.gz
zig-c89dd15e1be4959800dc7092d7dd4375253db7bc.zip
Merge remote-tracking branch 'origin/master' into llvm14
Diffstat (limited to 'lib/std/Thread/ResetEvent.zig')
-rw-r--r--lib/std/Thread/ResetEvent.zig420
1 files changed, 205 insertions, 215 deletions
diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig
index 7fc4f3c2b4..87232c29cf 100644
--- a/lib/std/Thread/ResetEvent.zig
+++ b/lib/std/Thread/ResetEvent.zig
@@ -1,291 +1,281 @@
-//! A thread-safe resource which supports blocking until signaled.
-//! This API is for kernel threads, not evented I/O.
-//! This API requires being initialized at runtime, and initialization
-//! can fail. Once initialized, the core operations cannot fail.
-//! If you need an abstraction that cannot fail to be initialized, see
-//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure,
-//! it is preferred to use `ResetEvent`.
+//! ResetEvent is a thread-safe bool which can be set to true/false ("set"/"unset").
+//! It can also block threads until the "bool" is set with cancellation via timed waits.
+//! ResetEvent can be statically initialized and is at most `@sizeOf(u64)` large.
-const ResetEvent = @This();
const std = @import("../std.zig");
const builtin = @import("builtin");
-const testing = std.testing;
-const assert = std.debug.assert;
-const c = std.c;
-const os = std.os;
-const time = std.time;
-
-impl: Impl,
+const ResetEvent = @This();
-pub const Impl = if (builtin.single_threaded)
- std.Thread.StaticResetEvent.DebugEvent
-else if (builtin.target.isDarwin())
- DarwinEvent
-else if (std.Thread.use_pthreads)
- PosixEvent
-else
- std.Thread.StaticResetEvent.AtomicEvent;
+const os = std.os;
+const assert = std.debug.assert;
+const testing = std.testing;
+const Atomic = std.atomic.Atomic;
+const Futex = std.Thread.Futex;
-pub const InitError = error{SystemResources};
+impl: Impl = .{},
-/// After `init`, it is legal to call any other function.
-pub fn init(ev: *ResetEvent) InitError!void {
- return ev.impl.init();
+/// Returns if the ResetEvent was set().
+/// Once reset() is called, this returns false until the next set().
+/// The memory accesses before the set() can be said to happen before isSet() returns true.
+pub fn isSet(self: *const ResetEvent) bool {
+ return self.impl.isSet();
}
-/// This function is not thread-safe.
-/// After `deinit`, the only legal function to call is `init`.
-pub fn deinit(ev: *ResetEvent) void {
- return ev.impl.deinit();
+/// Block's the callers thread until the ResetEvent is set().
+/// This is effectively a more efficient version of `while (!isSet()) {}`.
+/// The memory accesses before the set() can be said to happen before wait() returns.
+pub fn wait(self: *ResetEvent) void {
+ self.impl.wait(null) catch |err| switch (err) {
+ error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
+ };
}
-/// Sets the event if not already set and wakes up all the threads waiting on
-/// the event. It is safe to call `set` multiple times before calling `wait`.
-/// However it is illegal to call `set` after `wait` is called until the event
-/// is `reset`. This function is thread-safe.
-pub fn set(ev: *ResetEvent) void {
- return ev.impl.set();
+/// Block's the callers thread until the ResetEvent is set(), or until the corresponding timeout expires.
+/// If the timeout expires before the ResetEvent is set, `error.Timeout` is returned.
+/// This is effectively a more efficient version of `while (!isSet()) {}`.
+/// The memory accesses before the set() can be said to happen before timedWait() returns without error.
+pub fn timedWait(self: *ResetEvent, timeout_ns: u64) error{Timeout}!void {
+ return self.impl.wait(timeout_ns);
}
-/// Resets the event to its original, unset state.
-/// This function is *not* thread-safe. It is equivalent to calling
-/// `deinit` followed by `init` but without the possibility of failure.
-pub fn reset(ev: *ResetEvent) void {
- return ev.impl.reset();
+/// Marks the ResetEvent as "set" and unblocks any threads in `wait()` or `timedWait()` to observe the new state.
+/// The ResetEvent says "set" until reset() is called, making future set() calls do nothing semantically.
+/// The memory accesses before set() can be said to happen before isSet() returns true or wait()/timedWait() return successfully.
+pub fn set(self: *ResetEvent) void {
+ self.impl.set();
}
-/// Wait for the event to be set by blocking the current thread.
-/// Thread-safe. No spurious wakeups.
-/// Upon return from `wait`, the only functions available to be called
-/// in `ResetEvent` are `reset` and `deinit`.
-pub fn wait(ev: *ResetEvent) void {
- return ev.impl.wait();
+/// Unmarks the ResetEvent from its "set" state if set() was called previously.
+/// It is undefined behavior is reset() is called while threads are blocked in wait() or timedWait().
+/// Concurrent calls to set(), isSet() and reset() are allowed.
+pub fn reset(self: *ResetEvent) void {
+ self.impl.reset();
}
-pub const TimedWaitResult = enum { event_set, timed_out };
-
-/// Wait for the event to be set by blocking the current thread.
-/// A timeout in nanoseconds can be provided as a hint for how
-/// long the thread should block on the unset event before returning
-/// `TimedWaitResult.timed_out`.
-/// Thread-safe. No precision of timing is guaranteed.
-/// Upon return from `wait`, the only functions available to be called
-/// in `ResetEvent` are `reset` and `deinit`.
-pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult {
- return ev.impl.timedWait(timeout_ns);
-}
+const Impl = if (builtin.single_threaded)
+ SingleThreadedImpl
+else
+ FutexImpl;
-/// Apple has decided to not support POSIX semaphores, so we go with a
-/// different approach using Grand Central Dispatch. This API is exposed
-/// by libSystem so it is guaranteed to be available on all Darwin platforms.
-pub const DarwinEvent = struct {
- sem: c.dispatch_semaphore_t = undefined,
+const SingleThreadedImpl = struct {
+ is_set: bool = false,
- pub fn init(ev: *DarwinEvent) !void {
- ev.* = .{
- .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources,
- };
+ fn isSet(self: *const Impl) bool {
+ return self.is_set;
}
- pub fn deinit(ev: *DarwinEvent) void {
- c.dispatch_release(ev.sem);
- ev.* = undefined;
- }
+ fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void {
+ if (self.isSet()) {
+ return;
+ }
- pub fn set(ev: *DarwinEvent) void {
- // Empirically this returns the numerical value of the semaphore.
- _ = c.dispatch_semaphore_signal(ev.sem);
- }
+ // There are no other threads to wake us up.
+ // So if we wait without a timeout we would never wake up.
+ const timeout_ns = timeout orelse {
+ unreachable; // deadlock detected
+ };
- pub fn wait(ev: *DarwinEvent) void {
- assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0);
+ std.time.sleep(timeout_ns);
+ return error.Timeout;
}
- pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult {
- const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns));
- if (c.dispatch_semaphore_wait(ev.sem, t) != 0) {
- return .timed_out;
- } else {
- return .event_set;
- }
+ fn set(self: *Impl) void {
+ self.is_set = true;
}
- pub fn reset(ev: *DarwinEvent) void {
- // Keep calling until the semaphore goes back down to 0.
- while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {}
+ fn reset(self: *Impl) void {
+ self.is_set = false;
}
};
-/// POSIX semaphores must be initialized at runtime because they are allowed to
-/// be implemented as file descriptors, in which case initialization would require
-/// a syscall to open the fd.
-pub const PosixEvent = struct {
- sem: c.sem_t = undefined,
+const FutexImpl = struct {
+ state: Atomic(u32) = Atomic(u32).init(unset),
- pub fn init(ev: *PosixEvent) !void {
- switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) {
- .SUCCESS => return,
- else => return error.SystemResources,
- }
- }
+ const unset = 0;
+ const waiting = 1;
+ const is_set = 2;
- pub fn deinit(ev: *PosixEvent) void {
- assert(c.sem_destroy(&ev.sem) == 0);
- ev.* = undefined;
+ fn isSet(self: *const Impl) bool {
+ // Acquire barrier ensures memory accesses before set() happen before we return true.
+ return self.state.load(.Acquire) == is_set;
}
- pub fn set(ev: *PosixEvent) void {
- assert(c.sem_post(&ev.sem) == 0);
+ fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void {
+ // Outline the slow path to allow isSet() to be inlined
+ if (!self.isSet()) {
+ return self.waitUntilSet(timeout);
+ }
}
- pub fn wait(ev: *PosixEvent) void {
- while (true) {
- switch (c.getErrno(c.sem_wait(&ev.sem))) {
- .SUCCESS => return,
- .INTR => continue,
- .INVAL => unreachable,
- else => unreachable,
- }
+ fn waitUntilSet(self: *Impl, timeout: ?u64) error{Timeout}!void {
+ @setCold(true);
+
+ // Try to set the state from `unset` to `waiting` to indicate
+ // to the set() thread that others are blocked on the ResetEvent.
+ // We avoid using any strict barriers until the end when we know the ResetEvent is set.
+ var state = self.state.load(.Monotonic);
+ if (state == unset) {
+ state = self.state.compareAndSwap(state, waiting, .Monotonic, .Monotonic) orelse waiting;
}
- }
- pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult {
- var ts: os.timespec = undefined;
- var timeout_abs = timeout_ns;
- os.clock_gettime(os.CLOCK.REALTIME, &ts) catch return .timed_out;
- timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s;
- timeout_abs += @intCast(u64, ts.tv_nsec);
- ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s));
- ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s));
- while (true) {
- switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) {
- .SUCCESS => return .event_set,
- .INTR => continue,
- .INVAL => unreachable,
- .TIMEDOUT => return .timed_out,
- else => unreachable,
+ // Wait until the ResetEvent is set since the state is waiting.
+ if (state == waiting) {
+ var futex_deadline = Futex.Deadline.init(timeout);
+ while (true) {
+ const wait_result = futex_deadline.wait(&self.state, waiting);
+
+ // Check if the ResetEvent was set before possibly reporting error.Timeout below.
+ state = self.state.load(.Monotonic);
+ if (state != waiting) {
+ break;
+ }
+
+ try wait_result;
}
}
+
+ // Acquire barrier ensures memory accesses before set() happen before we return.
+ assert(state == is_set);
+ self.state.fence(.Acquire);
}
- pub fn reset(ev: *PosixEvent) void {
- while (true) {
- switch (c.getErrno(c.sem_trywait(&ev.sem))) {
- .SUCCESS => continue, // Need to make it go to zero.
- .INTR => continue,
- .INVAL => unreachable,
- .AGAIN => return, // The semaphore currently has the value zero.
- else => unreachable,
- }
+ fn set(self: *Impl) void {
+ // Quick check if the ResetEvent is already set before doing the atomic swap below.
+ // set() could be getting called quite often and multiple threads calling swap() increases contention unnecessarily.
+ if (self.state.load(.Monotonic) == is_set) {
+ return;
+ }
+
+ // Mark the ResetEvent as set and unblock all waiters waiting on it if any.
+ // Release barrier ensures memory accesses before set() happen before the ResetEvent is observed to be "set".
+ if (self.state.swap(is_set, .Release) == waiting) {
+ Futex.wake(&self.state, std.math.maxInt(u32));
}
}
+
+ fn reset(self: *Impl) void {
+ self.state.store(unset, .Monotonic);
+ }
};
-test "basic usage" {
- var event: ResetEvent = undefined;
- try event.init();
- defer event.deinit();
+test "ResetEvent - smoke test" {
+ // make sure the event is unset
+ var event = ResetEvent{};
+ try testing.expectEqual(false, event.isSet());
- // test event setting
+ // make sure the event gets set
event.set();
+ try testing.expectEqual(true, event.isSet());
- // test event resetting
+ // make sure the event gets unset again
event.reset();
+ try testing.expectEqual(false, event.isSet());
- // test event waiting (non-blocking)
- event.set();
- event.wait();
- event.reset();
+ // waits should timeout as there's no other thread to set the event
+ try testing.expectError(error.Timeout, event.timedWait(0));
+ try testing.expectError(error.Timeout, event.timedWait(std.time.ns_per_ms));
+ // set the event again and make sure waits complete
event.set();
- try testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1));
+ event.wait();
+ try event.timedWait(std.time.ns_per_ms);
+ try testing.expectEqual(true, event.isSet());
+}
- // test cross-thread signaling
- if (builtin.single_threaded)
- return;
+test "ResetEvent - signaling" {
+ // This test requires spawning threads
+ if (builtin.single_threaded) {
+ return error.SkipZigTest;
+ }
const Context = struct {
- const Self = @This();
-
- value: u128,
- in: ResetEvent,
- out: ResetEvent,
-
- fn init(self: *Self) !void {
- self.* = .{
- .value = 0,
- .in = undefined,
- .out = undefined,
- };
- try self.in.init();
- try self.out.init();
- }
+ in: ResetEvent = .{},
+ out: ResetEvent = .{},
+ value: usize = 0,
+
+ fn input(self: *@This()) !void {
+ // wait for the value to become 1
+ self.in.wait();
+ self.in.reset();
+ try testing.expectEqual(self.value, 1);
+
+ // bump the value and wake up output()
+ self.value = 2;
+ self.out.set();
- fn deinit(self: *Self) void {
- self.in.deinit();
- self.out.deinit();
- self.* = undefined;
+ // wait for output to receive 2, bump the value and wake us up with 3
+ self.in.wait();
+ self.in.reset();
+ try testing.expectEqual(self.value, 3);
+
+ // bump the value and wake up output() for it to see 4
+ self.value = 4;
+ self.out.set();
}
- fn sender(self: *Self) !void {
- // update value and signal input
- try testing.expect(self.value == 0);
+ fn output(self: *@This()) !void {
+ // start with 0 and bump the value for input to see 1
+ try testing.expectEqual(self.value, 0);
self.value = 1;
self.in.set();
- // wait for receiver to update value and signal output
+ // wait for input to receive 1, bump the value to 2 and wake us up
self.out.wait();
- try testing.expect(self.value == 2);
+ self.out.reset();
+ try testing.expectEqual(self.value, 2);
- // update value and signal final input
+ // bump the value to 3 for input to see (rhymes)
self.value = 3;
self.in.set();
+
+ // wait for input to bump the value to 4 and receive no more (rhymes)
+ self.out.wait();
+ self.out.reset();
+ try testing.expectEqual(self.value, 4);
}
+ };
- fn receiver(self: *Self) !void {
- // wait for sender to update value and signal input
- self.in.wait();
- try testing.expect(self.value == 1);
+ var ctx = Context{};
- // update value and signal output
- self.in.reset();
- self.value = 2;
- self.out.set();
+ const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx});
+ defer thread.join();
- // wait for sender to update value and signal final input
- self.in.wait();
- try testing.expect(self.value == 3);
- }
+ try ctx.input();
+}
- fn sleeper(self: *Self) void {
- self.in.set();
- time.sleep(time.ns_per_ms * 2);
- self.value = 5;
- self.out.set();
+test "ResetEvent - broadcast" {
+ // This test requires spawning threads
+ if (builtin.single_threaded) {
+ return error.SkipZigTest;
+ }
+
+ const num_threads = 10;
+ const Barrier = struct {
+ event: ResetEvent = .{},
+ counter: Atomic(usize) = Atomic(usize).init(num_threads),
+
+ fn wait(self: *@This()) void {
+ if (self.counter.fetchSub(1, .AcqRel) == 1) {
+ self.event.set();
+ }
}
+ };
- fn timedWaiter(self: *Self) !void {
- self.in.wait();
- try testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us));
- try self.out.timedWait(time.ns_per_ms * 100);
- try testing.expect(self.value == 5);
+ const Context = struct {
+ start_barrier: Barrier = .{},
+ finish_barrier: Barrier = .{},
+
+ fn run(self: *@This()) void {
+ self.start_barrier.wait();
+ self.finish_barrier.wait();
}
};
- var context: Context = undefined;
- try context.init();
- defer context.deinit();
- const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context});
- defer receiver.join();
- try context.sender();
-
- if (false) {
- // I have now observed this fail on macOS, Windows, and Linux.
- // https://github.com/ziglang/zig/issues/7009
- var timed = Context.init();
- defer timed.deinit();
- const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed});
- defer sleeper.join();
- try timed.timedWaiter();
- }
+ var ctx = Context{};
+ var threads: [num_threads - 1]std.Thread = undefined;
+
+ for (threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx});
+ defer for (threads) |t| t.join();
+
+ ctx.run();
}