aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread/Futex.zig
diff options
context:
space:
mode:
authorprotty <45520026+kprotty@users.noreply.github.com>2022-04-23 19:35:56 -0500
committerGitHub <noreply@github.com>2022-04-23 19:35:56 -0500
commit963ac60918b39cafdda3cb99eff4cd9d20edd839 (patch)
tree8b480e6c1c01c11758f47b1126c53e75a339d1a1 /lib/std/Thread/Futex.zig
parentdaef82d06fd6b30d2cab7f5a6723cf2e3c7b48c6 (diff)
downloadzig-963ac60918b39cafdda3cb99eff4cd9d20edd839.tar.gz
zig-963ac60918b39cafdda3cb99eff4cd9d20edd839.zip
std.Thread: Mutex and Condition improvements (#11497)
* Thread: minor cleanups * Thread: rewrite Mutex * Thread: introduce Futex.Deadline * Thread: Condition rewrite + cleanup * Mutex: optimize lock fast path * Condition: more docs * Thread: more mutex + condition docs * Thread: remove broken Condition test * Thread: zig fmt * address review comments + fix Thread.DummyMutex in GPA * Atomic: disable bitRmw x86 inline asm for stage2 * GPA: typo mutex_init * Thread: remove noalias on stuff * Thread: comment typos + clarifications
Diffstat (limited to 'lib/std/Thread/Futex.zig')
-rw-r--r--lib/std/Thread/Futex.zig69
1 files changed, 62 insertions, 7 deletions
diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig
index a1c8ca71e4..33eb30ba9d 100644
--- a/lib/std/Thread/Futex.zig
+++ b/lib/std/Thread/Futex.zig
@@ -10,14 +10,12 @@ const Futex = @This();
const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
-
const Atomic = std.atomic.Atomic;
-const spinLoopHint = std.atomic.spinLoopHint;
/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
/// - The value at `ptr` is no longer equal to `expect`.
/// - The caller is unblocked by a matching `wake()`.
-/// - The caller is unblocked spuriously by an arbitrary internal signal.
+/// - The caller is unblocked spuriously ("at random").
///
/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
@@ -32,7 +30,7 @@ pub fn wait(ptr: *const Atomic(u32), expect: u32) void {
/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
/// - The value at `ptr` is no longer equal to `expect`.
/// - The caller is unblocked by a matching `wake()`.
-/// - The caller is unblocked spuriously by an arbitrary internal signal.
+/// - The caller is unblocked spuriously ("at random").
/// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned.
///
/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
@@ -62,7 +60,7 @@ pub fn wake(ptr: *const Atomic(u32), max_waiters: u32) void {
}
const Impl = if (builtin.single_threaded)
- SerialImpl
+ SingleThreadedImpl
else if (builtin.os.tag == .windows)
WindowsImpl
else if (builtin.os.tag.isDarwin())
@@ -97,7 +95,7 @@ const UnsupportedImpl = struct {
}
};
-const SerialImpl = struct {
+const SingleThreadedImpl = struct {
fn wait(ptr: *const Atomic(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
if (ptr.loadUnchecked() != expect) {
return;
@@ -804,7 +802,7 @@ const PosixImpl = struct {
//
// What we really want here is a Release load, but that doesn't exist under the C11 memory model.
// We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing,
- // but the RMW operation unconditionally stores which invalidates the cache-line for others causing unnecessary contention.
+ // but the RMW operation unconditionally marks the cache-line as modified for others causing unnecessary fetching/contention.
//
// Instead we opt to do a full-fence + load instead which avoids taking ownership of the cache-line.
// fence(SeqCst) effectively converts the ptr update to SeqCst and the pending load to SeqCst: creating a Store-Load barrier.
@@ -962,3 +960,60 @@ test "Futex - broadcasting" {
for (broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast});
for (broadcast.threads) |t| t.join();
}
+
+/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout.
+///
+/// Futex's timedWait() api uses a relative duration which suffers from over-waiting
+/// when used in a loop which is often required due to the possibility of spurious wakeups.
+///
+/// Deadline instead converts the relative timeout to an absolute one so that multiple calls
+/// to Futex timedWait() can block for and report more accurate error.Timeouts.
+pub const Deadline = struct {
+ timeout: ?u64,
+ started: std.time.Timer,
+
+ /// Create the deadline to expire after the given amount of time in nanoseconds passes.
+ /// Pass in `null` to have the deadline call `Futex.wait()` and never expire.
+ pub fn init(expires_in_ns: ?u64) Deadline {
+ var deadline: Deadline = undefined;
+ deadline.timeout = expires_in_ns;
+
+ // std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout.
+ if (deadline.timeout != null) {
+ deadline.started = std.time.Timer.start() catch unreachable;
+ }
+
+ return deadline;
+ }
+
+ /// Wait until either:
+ /// - the `ptr`'s value changes from `expect`.
+ /// - `Futex.wake()` is called on the `ptr`.
+ /// - A spurious wake occurs.
+ /// - The deadline expires; In which case `error.Timeout` is returned.
+ pub fn wait(self: *Deadline, ptr: *const Atomic(u32), expect: u32) error{Timeout}!void {
+ @setCold(true);
+
+ // Check if we actually have a timeout to wait until.
+ // If not just wait "forever".
+ const timeout_ns = self.timeout orelse {
+ return Futex.wait(ptr, expect);
+ };
+
+ // Get how much time has passed since we started waiting
+ // then subtract that from the init() timeout to get how much longer to wait.
+ // Use overflow to detect when we've been waiting longer than the init() timeout.
+ const elapsed_ns = self.started.read();
+ const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0;
+ return Futex.timedWait(ptr, expect, until_timeout_ns);
+ }
+};
+
+test "Futex - Deadline" {
+ var deadline = Deadline.init(100 * std.time.ns_per_ms);
+ var futex_word = Atomic(u32).init(0);
+
+ while (true) {
+ deadline.wait(&futex_word, 0) catch break;
+ }
+}