diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2022-07-01 15:52:54 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2022-07-01 15:52:54 -0700 |
| commit | c89dd15e1be4959800dc7092d7dd4375253db7bc (patch) | |
| tree | ca184ae53592efa21e67128a5f891d642d7f1118 /lib/std/Thread/ResetEvent.zig | |
| parent | 5466e87fce581f2ef90ac23bb80b1dbc05836fc6 (diff) | |
| parent | 2360f8c490f3ec684ed64ff28e8c1fade249070b (diff) | |
| download | zig-c89dd15e1be4959800dc7092d7dd4375253db7bc.tar.gz zig-c89dd15e1be4959800dc7092d7dd4375253db7bc.zip | |
Merge remote-tracking branch 'origin/master' into llvm14
Diffstat (limited to 'lib/std/Thread/ResetEvent.zig')
| -rw-r--r-- | lib/std/Thread/ResetEvent.zig | 420 |
1 files changed, 205 insertions, 215 deletions
diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig index 7fc4f3c2b4..87232c29cf 100644 --- a/lib/std/Thread/ResetEvent.zig +++ b/lib/std/Thread/ResetEvent.zig @@ -1,291 +1,281 @@ -//! A thread-safe resource which supports blocking until signaled. -//! This API is for kernel threads, not evented I/O. -//! This API requires being initialized at runtime, and initialization -//! can fail. Once initialized, the core operations cannot fail. -//! If you need an abstraction that cannot fail to be initialized, see -//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure, -//! it is preferred to use `ResetEvent`. +//! ResetEvent is a thread-safe bool which can be set to true/false ("set"/"unset"). +//! It can also block threads until the "bool" is set with cancellation via timed waits. +//! ResetEvent can be statically initialized and is at most `@sizeOf(u64)` large. -const ResetEvent = @This(); const std = @import("../std.zig"); const builtin = @import("builtin"); -const testing = std.testing; -const assert = std.debug.assert; -const c = std.c; -const os = std.os; -const time = std.time; - -impl: Impl, +const ResetEvent = @This(); -pub const Impl = if (builtin.single_threaded) - std.Thread.StaticResetEvent.DebugEvent -else if (builtin.target.isDarwin()) - DarwinEvent -else if (std.Thread.use_pthreads) - PosixEvent -else - std.Thread.StaticResetEvent.AtomicEvent; +const os = std.os; +const assert = std.debug.assert; +const testing = std.testing; +const Atomic = std.atomic.Atomic; +const Futex = std.Thread.Futex; -pub const InitError = error{SystemResources}; +impl: Impl = .{}, -/// After `init`, it is legal to call any other function. -pub fn init(ev: *ResetEvent) InitError!void { - return ev.impl.init(); +/// Returns if the ResetEvent was set(). +/// Once reset() is called, this returns false until the next set(). +/// The memory accesses before the set() can be said to happen before isSet() returns true. +pub fn isSet(self: *const ResetEvent) bool { + return self.impl.isSet(); } -/// This function is not thread-safe. -/// After `deinit`, the only legal function to call is `init`. -pub fn deinit(ev: *ResetEvent) void { - return ev.impl.deinit(); +/// Block's the callers thread until the ResetEvent is set(). +/// This is effectively a more efficient version of `while (!isSet()) {}`. +/// The memory accesses before the set() can be said to happen before wait() returns. +pub fn wait(self: *ResetEvent) void { + self.impl.wait(null) catch |err| switch (err) { + error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out + }; } -/// Sets the event if not already set and wakes up all the threads waiting on -/// the event. It is safe to call `set` multiple times before calling `wait`. -/// However it is illegal to call `set` after `wait` is called until the event -/// is `reset`. This function is thread-safe. -pub fn set(ev: *ResetEvent) void { - return ev.impl.set(); +/// Block's the callers thread until the ResetEvent is set(), or until the corresponding timeout expires. +/// If the timeout expires before the ResetEvent is set, `error.Timeout` is returned. +/// This is effectively a more efficient version of `while (!isSet()) {}`. +/// The memory accesses before the set() can be said to happen before timedWait() returns without error. +pub fn timedWait(self: *ResetEvent, timeout_ns: u64) error{Timeout}!void { + return self.impl.wait(timeout_ns); } -/// Resets the event to its original, unset state. -/// This function is *not* thread-safe. It is equivalent to calling -/// `deinit` followed by `init` but without the possibility of failure. -pub fn reset(ev: *ResetEvent) void { - return ev.impl.reset(); +/// Marks the ResetEvent as "set" and unblocks any threads in `wait()` or `timedWait()` to observe the new state. +/// The ResetEvent says "set" until reset() is called, making future set() calls do nothing semantically. +/// The memory accesses before set() can be said to happen before isSet() returns true or wait()/timedWait() return successfully. +pub fn set(self: *ResetEvent) void { + self.impl.set(); } -/// Wait for the event to be set by blocking the current thread. -/// Thread-safe. No spurious wakeups. -/// Upon return from `wait`, the only functions available to be called -/// in `ResetEvent` are `reset` and `deinit`. -pub fn wait(ev: *ResetEvent) void { - return ev.impl.wait(); +/// Unmarks the ResetEvent from its "set" state if set() was called previously. +/// It is undefined behavior is reset() is called while threads are blocked in wait() or timedWait(). +/// Concurrent calls to set(), isSet() and reset() are allowed. +pub fn reset(self: *ResetEvent) void { + self.impl.reset(); } -pub const TimedWaitResult = enum { event_set, timed_out }; - -/// Wait for the event to be set by blocking the current thread. -/// A timeout in nanoseconds can be provided as a hint for how -/// long the thread should block on the unset event before returning -/// `TimedWaitResult.timed_out`. -/// Thread-safe. No precision of timing is guaranteed. -/// Upon return from `wait`, the only functions available to be called -/// in `ResetEvent` are `reset` and `deinit`. -pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult { - return ev.impl.timedWait(timeout_ns); -} +const Impl = if (builtin.single_threaded) + SingleThreadedImpl +else + FutexImpl; -/// Apple has decided to not support POSIX semaphores, so we go with a -/// different approach using Grand Central Dispatch. This API is exposed -/// by libSystem so it is guaranteed to be available on all Darwin platforms. -pub const DarwinEvent = struct { - sem: c.dispatch_semaphore_t = undefined, +const SingleThreadedImpl = struct { + is_set: bool = false, - pub fn init(ev: *DarwinEvent) !void { - ev.* = .{ - .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources, - }; + fn isSet(self: *const Impl) bool { + return self.is_set; } - pub fn deinit(ev: *DarwinEvent) void { - c.dispatch_release(ev.sem); - ev.* = undefined; - } + fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void { + if (self.isSet()) { + return; + } - pub fn set(ev: *DarwinEvent) void { - // Empirically this returns the numerical value of the semaphore. - _ = c.dispatch_semaphore_signal(ev.sem); - } + // There are no other threads to wake us up. + // So if we wait without a timeout we would never wake up. + const timeout_ns = timeout orelse { + unreachable; // deadlock detected + }; - pub fn wait(ev: *DarwinEvent) void { - assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0); + std.time.sleep(timeout_ns); + return error.Timeout; } - pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult { - const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns)); - if (c.dispatch_semaphore_wait(ev.sem, t) != 0) { - return .timed_out; - } else { - return .event_set; - } + fn set(self: *Impl) void { + self.is_set = true; } - pub fn reset(ev: *DarwinEvent) void { - // Keep calling until the semaphore goes back down to 0. - while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {} + fn reset(self: *Impl) void { + self.is_set = false; } }; -/// POSIX semaphores must be initialized at runtime because they are allowed to -/// be implemented as file descriptors, in which case initialization would require -/// a syscall to open the fd. -pub const PosixEvent = struct { - sem: c.sem_t = undefined, +const FutexImpl = struct { + state: Atomic(u32) = Atomic(u32).init(unset), - pub fn init(ev: *PosixEvent) !void { - switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) { - .SUCCESS => return, - else => return error.SystemResources, - } - } + const unset = 0; + const waiting = 1; + const is_set = 2; - pub fn deinit(ev: *PosixEvent) void { - assert(c.sem_destroy(&ev.sem) == 0); - ev.* = undefined; + fn isSet(self: *const Impl) bool { + // Acquire barrier ensures memory accesses before set() happen before we return true. + return self.state.load(.Acquire) == is_set; } - pub fn set(ev: *PosixEvent) void { - assert(c.sem_post(&ev.sem) == 0); + fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void { + // Outline the slow path to allow isSet() to be inlined + if (!self.isSet()) { + return self.waitUntilSet(timeout); + } } - pub fn wait(ev: *PosixEvent) void { - while (true) { - switch (c.getErrno(c.sem_wait(&ev.sem))) { - .SUCCESS => return, - .INTR => continue, - .INVAL => unreachable, - else => unreachable, - } + fn waitUntilSet(self: *Impl, timeout: ?u64) error{Timeout}!void { + @setCold(true); + + // Try to set the state from `unset` to `waiting` to indicate + // to the set() thread that others are blocked on the ResetEvent. + // We avoid using any strict barriers until the end when we know the ResetEvent is set. + var state = self.state.load(.Monotonic); + if (state == unset) { + state = self.state.compareAndSwap(state, waiting, .Monotonic, .Monotonic) orelse waiting; } - } - pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult { - var ts: os.timespec = undefined; - var timeout_abs = timeout_ns; - os.clock_gettime(os.CLOCK.REALTIME, &ts) catch return .timed_out; - timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, ts.tv_nsec); - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); - while (true) { - switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) { - .SUCCESS => return .event_set, - .INTR => continue, - .INVAL => unreachable, - .TIMEDOUT => return .timed_out, - else => unreachable, + // Wait until the ResetEvent is set since the state is waiting. + if (state == waiting) { + var futex_deadline = Futex.Deadline.init(timeout); + while (true) { + const wait_result = futex_deadline.wait(&self.state, waiting); + + // Check if the ResetEvent was set before possibly reporting error.Timeout below. + state = self.state.load(.Monotonic); + if (state != waiting) { + break; + } + + try wait_result; } } + + // Acquire barrier ensures memory accesses before set() happen before we return. + assert(state == is_set); + self.state.fence(.Acquire); } - pub fn reset(ev: *PosixEvent) void { - while (true) { - switch (c.getErrno(c.sem_trywait(&ev.sem))) { - .SUCCESS => continue, // Need to make it go to zero. - .INTR => continue, - .INVAL => unreachable, - .AGAIN => return, // The semaphore currently has the value zero. - else => unreachable, - } + fn set(self: *Impl) void { + // Quick check if the ResetEvent is already set before doing the atomic swap below. + // set() could be getting called quite often and multiple threads calling swap() increases contention unnecessarily. + if (self.state.load(.Monotonic) == is_set) { + return; + } + + // Mark the ResetEvent as set and unblock all waiters waiting on it if any. + // Release barrier ensures memory accesses before set() happen before the ResetEvent is observed to be "set". + if (self.state.swap(is_set, .Release) == waiting) { + Futex.wake(&self.state, std.math.maxInt(u32)); } } + + fn reset(self: *Impl) void { + self.state.store(unset, .Monotonic); + } }; -test "basic usage" { - var event: ResetEvent = undefined; - try event.init(); - defer event.deinit(); +test "ResetEvent - smoke test" { + // make sure the event is unset + var event = ResetEvent{}; + try testing.expectEqual(false, event.isSet()); - // test event setting + // make sure the event gets set event.set(); + try testing.expectEqual(true, event.isSet()); - // test event resetting + // make sure the event gets unset again event.reset(); + try testing.expectEqual(false, event.isSet()); - // test event waiting (non-blocking) - event.set(); - event.wait(); - event.reset(); + // waits should timeout as there's no other thread to set the event + try testing.expectError(error.Timeout, event.timedWait(0)); + try testing.expectError(error.Timeout, event.timedWait(std.time.ns_per_ms)); + // set the event again and make sure waits complete event.set(); - try testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + event.wait(); + try event.timedWait(std.time.ns_per_ms); + try testing.expectEqual(true, event.isSet()); +} - // test cross-thread signaling - if (builtin.single_threaded) - return; +test "ResetEvent - signaling" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } const Context = struct { - const Self = @This(); - - value: u128, - in: ResetEvent, - out: ResetEvent, - - fn init(self: *Self) !void { - self.* = .{ - .value = 0, - .in = undefined, - .out = undefined, - }; - try self.in.init(); - try self.out.init(); - } + in: ResetEvent = .{}, + out: ResetEvent = .{}, + value: usize = 0, + + fn input(self: *@This()) !void { + // wait for the value to become 1 + self.in.wait(); + self.in.reset(); + try testing.expectEqual(self.value, 1); + + // bump the value and wake up output() + self.value = 2; + self.out.set(); - fn deinit(self: *Self) void { - self.in.deinit(); - self.out.deinit(); - self.* = undefined; + // wait for output to receive 2, bump the value and wake us up with 3 + self.in.wait(); + self.in.reset(); + try testing.expectEqual(self.value, 3); + + // bump the value and wake up output() for it to see 4 + self.value = 4; + self.out.set(); } - fn sender(self: *Self) !void { - // update value and signal input - try testing.expect(self.value == 0); + fn output(self: *@This()) !void { + // start with 0 and bump the value for input to see 1 + try testing.expectEqual(self.value, 0); self.value = 1; self.in.set(); - // wait for receiver to update value and signal output + // wait for input to receive 1, bump the value to 2 and wake us up self.out.wait(); - try testing.expect(self.value == 2); + self.out.reset(); + try testing.expectEqual(self.value, 2); - // update value and signal final input + // bump the value to 3 for input to see (rhymes) self.value = 3; self.in.set(); + + // wait for input to bump the value to 4 and receive no more (rhymes) + self.out.wait(); + self.out.reset(); + try testing.expectEqual(self.value, 4); } + }; - fn receiver(self: *Self) !void { - // wait for sender to update value and signal input - self.in.wait(); - try testing.expect(self.value == 1); + var ctx = Context{}; - // update value and signal output - self.in.reset(); - self.value = 2; - self.out.set(); + const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx}); + defer thread.join(); - // wait for sender to update value and signal final input - self.in.wait(); - try testing.expect(self.value == 3); - } + try ctx.input(); +} - fn sleeper(self: *Self) void { - self.in.set(); - time.sleep(time.ns_per_ms * 2); - self.value = 5; - self.out.set(); +test "ResetEvent - broadcast" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const num_threads = 10; + const Barrier = struct { + event: ResetEvent = .{}, + counter: Atomic(usize) = Atomic(usize).init(num_threads), + + fn wait(self: *@This()) void { + if (self.counter.fetchSub(1, .AcqRel) == 1) { + self.event.set(); + } } + }; - fn timedWaiter(self: *Self) !void { - self.in.wait(); - try testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); - try self.out.timedWait(time.ns_per_ms * 100); - try testing.expect(self.value == 5); + const Context = struct { + start_barrier: Barrier = .{}, + finish_barrier: Barrier = .{}, + + fn run(self: *@This()) void { + self.start_barrier.wait(); + self.finish_barrier.wait(); } }; - var context: Context = undefined; - try context.init(); - defer context.deinit(); - const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); - defer receiver.join(); - try context.sender(); - - if (false) { - // I have now observed this fail on macOS, Windows, and Linux. - // https://github.com/ziglang/zig/issues/7009 - var timed = Context.init(); - defer timed.deinit(); - const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); - defer sleeper.join(); - try timed.timedWaiter(); - } + var ctx = Context{}; + var threads: [num_threads - 1]std.Thread = undefined; + + for (threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx}); + defer for (threads) |t| t.join(); + + ctx.run(); } |
