diff options
| author | protty <45520026+kprotty@users.noreply.github.com> | 2022-04-23 19:35:56 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-04-23 19:35:56 -0500 |
| commit | 963ac60918b39cafdda3cb99eff4cd9d20edd839 (patch) | |
| tree | 8b480e6c1c01c11758f47b1126c53e75a339d1a1 /lib/std/Thread/Futex.zig | |
| parent | daef82d06fd6b30d2cab7f5a6723cf2e3c7b48c6 (diff) | |
| download | zig-963ac60918b39cafdda3cb99eff4cd9d20edd839.tar.gz zig-963ac60918b39cafdda3cb99eff4cd9d20edd839.zip | |
std.Thread: Mutex and Condition improvements (#11497)
* Thread: minor cleanups
* Thread: rewrite Mutex
* Thread: introduce Futex.Deadline
* Thread: Condition rewrite + cleanup
* Mutex: optimize lock fast path
* Condition: more docs
* Thread: more mutex + condition docs
* Thread: remove broken Condition test
* Thread: zig fmt
* address review comments + fix Thread.DummyMutex in GPA
* Atomic: disable bitRmw x86 inline asm for stage2
* GPA: typo mutex_init
* Thread: remove noalias on stuff
* Thread: comment typos + clarifications
Diffstat (limited to 'lib/std/Thread/Futex.zig')
| -rw-r--r-- | lib/std/Thread/Futex.zig | 69 |
1 files changed, 62 insertions, 7 deletions
diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index a1c8ca71e4..33eb30ba9d 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -10,14 +10,12 @@ const Futex = @This(); const os = std.os; const assert = std.debug.assert; const testing = std.testing; - const Atomic = std.atomic.Atomic; -const spinLoopHint = std.atomic.spinLoopHint; /// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either: /// - The value at `ptr` is no longer equal to `expect`. /// - The caller is unblocked by a matching `wake()`. -/// - The caller is unblocked spuriously by an arbitrary internal signal. +/// - The caller is unblocked spuriously ("at random"). /// /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically /// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`. @@ -32,7 +30,7 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32) void { /// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either: /// - The value at `ptr` is no longer equal to `expect`. /// - The caller is unblocked by a matching `wake()`. -/// - The caller is unblocked spuriously by an arbitrary internal signal. +/// - The caller is unblocked spuriously ("at random"). /// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned. /// /// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically @@ -62,7 +60,7 @@ pub fn wake(ptr: *const Atomic(u32), max_waiters: u32) void { } const Impl = if (builtin.single_threaded) - SerialImpl + SingleThreadedImpl else if (builtin.os.tag == .windows) WindowsImpl else if (builtin.os.tag.isDarwin()) @@ -97,7 +95,7 @@ const UnsupportedImpl = struct { } }; -const SerialImpl = struct { +const SingleThreadedImpl = struct { fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void { if (ptr.loadUnchecked() != expect) { return; @@ -804,7 +802,7 @@ const PosixImpl = struct { // // What we really want here is a Release load, but that doesn't exist under the C11 memory model. // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing, - // but the RMW operation unconditionally stores which invalidates the cache-line for others causing unnecessary contention. + // but the RMW operation unconditionally marks the cache-line as modified for others causing unnecessary fetching/contention. // // Instead we opt to do a full-fence + load instead which avoids taking ownership of the cache-line. // fence(SeqCst) effectively converts the ptr update to SeqCst and the pending load to SeqCst: creating a Store-Load barrier. @@ -962,3 +960,60 @@ test "Futex - broadcasting" { for (broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast}); for (broadcast.threads) |t| t.join(); } + +/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout. +/// +/// Futex's timedWait() api uses a relative duration which suffers from over-waiting +/// when used in a loop which is often required due to the possibility of spurious wakeups. +/// +/// Deadline instead converts the relative timeout to an absolute one so that multiple calls +/// to Futex timedWait() can block for and report more accurate error.Timeouts. +pub const Deadline = struct { + timeout: ?u64, + started: std.time.Timer, + + /// Create the deadline to expire after the given amount of time in nanoseconds passes. + /// Pass in `null` to have the deadline call `Futex.wait()` and never expire. + pub fn init(expires_in_ns: ?u64) Deadline { + var deadline: Deadline = undefined; + deadline.timeout = expires_in_ns; + + // std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout. + if (deadline.timeout != null) { + deadline.started = std.time.Timer.start() catch unreachable; + } + + return deadline; + } + + /// Wait until either: + /// - the `ptr`'s value changes from `expect`. + /// - `Futex.wake()` is called on the `ptr`. + /// - A spurious wake occurs. + /// - The deadline expires; In which case `error.Timeout` is returned. + pub fn wait(self: *Deadline, ptr: *const Atomic(u32), expect: u32) error{Timeout}!void { + @setCold(true); + + // Check if we actually have a timeout to wait until. + // If not just wait "forever". + const timeout_ns = self.timeout orelse { + return Futex.wait(ptr, expect); + }; + + // Get how much time has passed since we started waiting + // then subtract that from the init() timeout to get how much longer to wait. + // Use overflow to detect when we've been waiting longer than the init() timeout. + const elapsed_ns = self.started.read(); + const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0; + return Futex.timedWait(ptr, expect, until_timeout_ns); + } +}; + +test "Futex - Deadline" { + var deadline = Deadline.init(100 * std.time.ns_per_ms); + var futex_word = Atomic(u32).init(0); + + while (true) { + deadline.wait(&futex_word, 0) catch break; + } +} |
