aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread/Condition.zig
diff options
context:
space:
mode:
authorprotty <45520026+kprotty@users.noreply.github.com>2022-04-23 19:35:56 -0500
committerGitHub <noreply@github.com>2022-04-23 19:35:56 -0500
commit963ac60918b39cafdda3cb99eff4cd9d20edd839 (patch)
tree8b480e6c1c01c11758f47b1126c53e75a339d1a1 /lib/std/Thread/Condition.zig
parentdaef82d06fd6b30d2cab7f5a6723cf2e3c7b48c6 (diff)
downloadzig-963ac60918b39cafdda3cb99eff4cd9d20edd839.tar.gz
zig-963ac60918b39cafdda3cb99eff4cd9d20edd839.zip
std.Thread: Mutex and Condition improvements (#11497)
* Thread: minor cleanups * Thread: rewrite Mutex * Thread: introduce Futex.Deadline * Thread: Condition rewrite + cleanup * Mutex: optimize lock fast path * Condition: more docs * Thread: more mutex + condition docs * Thread: remove broken Condition test * Thread: zig fmt * address review comments + fix Thread.DummyMutex in GPA * Atomic: disable bitRmw x86 inline asm for stage2 * GPA: typo mutex_init * Thread: remove noalias on stuff * Thread: comment typos + clarifications
Diffstat (limited to 'lib/std/Thread/Condition.zig')
-rw-r--r--lib/std/Thread/Condition.zig743
1 files changed, 435 insertions, 308 deletions
diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig
index fb48db8e53..d5855ec066 100644
--- a/lib/std/Thread/Condition.zig
+++ b/lib/std/Thread/Condition.zig
@@ -1,411 +1,538 @@
-//! A condition provides a way for a kernel thread to block until it is signaled
-//! to wake up. Spurious wakeups are possible.
-//! This API supports static initialization and does not require deinitialization.
-
-impl: Impl = .{},
+//! Condition variables are used with a Mutex to efficiently wait for an arbitrary condition to occur.
+//! It does this by atomically unlocking the mutex, blocking the thread until notified, and finally re-locking the mutex.
+//! Condition can be statically initialized and is at most `@sizeOf(u64)` large.
+//!
+//! Example:
+//! ```
+//! var m = Mutex{};
+//! var c = Condition{};
+//! var predicate = false;
+//!
+//! fn consumer() void {
+//! m.lock();
+//! defer m.unlock();
+//!
+//! while (!predicate) {
+//! c.wait(&mutex);
+//! }
+//! }
+//!
+//! fn producer() void {
+//! m.lock();
+//! defer m.unlock();
+//!
+//! predicate = true;
+//! c.signal();
+//! }
+//!
+//! const thread = try std.Thread.spawn(.{}, producer, .{});
+//! consumer();
+//! thread.join();
+//! ```
+//!
+//! Note that condition variables can only reliably unblock threads that are sequenced before them using the same Mutex.
+//! This means that the following is allowed to deadlock:
+//! ```
+//! thread-1: mutex.lock()
+//! thread-1: condition.wait(&mutex)
+//!
+//! thread-2: // mutex.lock() (without this, the following signal may not see the waiting thread-1)
+//! thread-2: // mutex.unlock() (this is optional for correctness once locked above, as signal can be called without holding the mutex)
+//! thread-2: condition.signal()
+//! ```
const std = @import("../std.zig");
const builtin = @import("builtin");
const Condition = @This();
-const windows = std.os.windows;
-const linux = std.os.linux;
const Mutex = std.Thread.Mutex;
+
+const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
+const Atomic = std.atomic.Atomic;
+const Futex = std.Thread.Futex;
+
+impl: Impl = .{},
-pub fn wait(cond: *Condition, mutex: *Mutex) void {
- cond.impl.wait(mutex);
+/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return.
+/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex.
+///
+/// The Mutex must be locked by the caller's thread when this function is called.
+/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite.
+/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently.
+/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex.
+///
+/// A blocking call to wait() is unblocked from one of the following conditions:
+/// - a spurious ("at random") wake up occurs
+/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `wait()`.
+///
+/// Given wait() can be interrupted spuriously, the blocking condition should be checked continuously
+/// irrespective of any notifications from `signal()` or `broadcast()`.
+pub fn wait(self: *Condition, mutex: *Mutex) void {
+ self.impl.wait(mutex, null) catch |err| switch (err) {
+ error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
+ };
}
-pub fn timedWait(cond: *Condition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
- try cond.impl.timedWait(mutex, timeout_ns);
+/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return.
+/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex.
+///
+/// The Mutex must be locked by the caller's thread when this function is called.
+/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite.
+/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently.
+/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex.
+///
+/// A blocking call to `timedWait()` is unblocked from one of the following conditions:
+/// - a spurious ("at random") wake occurs
+/// - the caller was blocked for around `timeout_ns` nanoseconds, in which `error.Timeout` is returned.
+/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `timedWait()`.
+///
+/// Given `timedWait()` can be interrupted spuriously, the blocking condition should be checked continuously
+/// irrespective of any notifications from `signal()` or `broadcast()`.
+pub fn timedWait(self: *Condition, mutex: *Mutex, timeout_ns: u64) error{Timeout}!void {
+ return self.impl.wait(mutex, timeout_ns);
}
-pub fn signal(cond: *Condition) void {
- cond.impl.signal();
+/// Unblocks at least one thread blocked in a call to `wait()` or `timedWait()` with a given Mutex.
+/// The blocked thread must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking.
+/// `signal()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads.
+pub fn signal(self: *Condition) void {
+ self.impl.wake(.one);
}
-pub fn broadcast(cond: *Condition) void {
- cond.impl.broadcast();
+/// Unblocks all threads currently blocked in a call to `wait()` or `timedWait()` with a given Mutex.
+/// The blocked threads must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking.
+/// `broadcast()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads.
+pub fn broadcast(self: *Condition) void {
+ self.impl.wake(.all);
}
const Impl = if (builtin.single_threaded)
- SingleThreadedCondition
+ SingleThreadedImpl
else if (builtin.os.tag == .windows)
- WindowsCondition
-else if (std.Thread.use_pthreads)
- PthreadCondition
+ WindowsImpl
else
- AtomicCondition;
+ FutexImpl;
-pub const SingleThreadedCondition = struct {
- pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void {
- _ = cond;
- _ = mutex;
- unreachable; // deadlock detected
- }
+const Notify = enum {
+ one, // wake up only one thread
+ all, // wake up all threads
+};
- pub fn timedWait(cond: *SingleThreadedCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
- _ = cond;
+const SingleThreadedImpl = struct {
+ fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
+ _ = self;
_ = mutex;
- _ = timeout_ns;
- std.time.sleep(timeout_ns);
- return error.TimedOut;
- }
- pub fn signal(cond: *SingleThreadedCondition) void {
- _ = cond;
+ // There are no other threads to wake us up.
+ // So if we wait without a timeout we would never wake up.
+ const timeout_ns = timeout orelse {
+ unreachable; // deadlock detected
+ };
+
+ std.time.sleep(timeout_ns);
+ return error.Timeout;
}
- pub fn broadcast(cond: *SingleThreadedCondition) void {
- _ = cond;
+ fn wake(self: *Impl, comptime notify: Notify) void {
+ // There are no other threads to wake up.
+ _ = self;
+ _ = notify;
}
};
-pub const WindowsCondition = struct {
- cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT,
+const WindowsImpl = struct {
+ condition: os.windows.CONDITION_VARIABLE = .{},
- pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void {
- const rc = windows.kernel32.SleepConditionVariableSRW(
- &cond.cond,
- &mutex.impl.srwlock,
- windows.INFINITE,
- @as(windows.ULONG, 0),
- );
- assert(rc != windows.FALSE);
- }
+ fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
+ var timeout_overflowed = false;
+ var timeout_ms: os.windows.DWORD = os.windows.INFINITE;
- pub fn timedWait(cond: *WindowsCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
- var timeout_checked = std.math.cast(windows.DWORD, timeout_ns / std.time.ns_per_ms) catch overflow: {
- break :overflow std.math.maxInt(windows.DWORD);
- };
+ if (timeout) |timeout_ns| {
+ // Round the nanoseconds to the nearest millisecond,
+ // then saturating cast it to windows DWORD for use in kernel32 call.
+ const ms = (timeout_ns +| (std.time.ns_per_ms / 2)) / std.time.ns_per_ms;
+ timeout_ms = std.math.cast(os.windows.DWORD, ms) catch std.math.maxInt(os.windows.DWORD);
- // Handle the case where timeout is INFINITE, otherwise SleepConditionVariableSRW's time-out never elapses
- const timeout_overflowed = timeout_checked == windows.INFINITE;
- timeout_checked -= @boolToInt(timeout_overflowed);
+ // Track if the timeout overflowed into INFINITE and make sure not to wait forever.
+ if (timeout_ms == os.windows.INFINITE) {
+ timeout_overflowed = true;
+ timeout_ms -= 1;
+ }
+ }
- const rc = windows.kernel32.SleepConditionVariableSRW(
- &cond.cond,
+ const rc = os.windows.kernel32.SleepConditionVariableSRW(
+ &self.condition,
&mutex.impl.srwlock,
- timeout_checked,
- @as(windows.ULONG, 0),
+ timeout_ms,
+ 0, // the srwlock was assumed to acquired in exclusive mode not shared
);
- if (rc == windows.FALSE and windows.kernel32.GetLastError() == windows.Win32Error.TIMEOUT) return error.TimedOut;
- assert(rc != windows.FALSE);
- }
- pub fn signal(cond: *WindowsCondition) void {
- windows.kernel32.WakeConditionVariable(&cond.cond);
+ // Return error.Timeout if we know the timeout elapsed correctly.
+ if (rc == os.windows.FALSE) {
+ assert(os.windows.kernel32.GetLastError() == .TIMEOUT);
+ if (!timeout_overflowed) return error.Timeout;
+ }
}
- pub fn broadcast(cond: *WindowsCondition) void {
- windows.kernel32.WakeAllConditionVariable(&cond.cond);
+ fn wake(self: *Impl, comptime notify: Notify) void {
+ switch (notify) {
+ .one => os.windows.kernel32.WakeConditionVariable(&self.condition),
+ .all => os.windows.kernel32.WakeAllConditionVariable(&self.condition),
+ }
}
};
-pub const PthreadCondition = struct {
- cond: std.c.pthread_cond_t = .{},
+const FutexImpl = struct {
+ state: Atomic(u32) = Atomic(u32).init(0),
+ epoch: Atomic(u32) = Atomic(u32).init(0),
- pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void {
- const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.impl.pthread_mutex);
- assert(rc == .SUCCESS);
- }
+ const one_waiter = 1;
+ const waiter_mask = 0xffff;
- pub fn timedWait(cond: *PthreadCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
- var ts: std.os.timespec = undefined;
- std.os.clock_gettime(std.os.CLOCK.REALTIME, &ts) catch unreachable;
- ts.tv_sec += @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
- ts.tv_nsec += @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
- if (ts.tv_nsec >= std.time.ns_per_s) {
- ts.tv_sec += 1;
- ts.tv_nsec -= std.time.ns_per_s;
- }
+ const one_signal = 1 << 16;
+ const signal_mask = 0xffff << 16;
- const rc = std.c.pthread_cond_timedwait(&cond.cond, &mutex.impl.pthread_mutex, &ts);
- return switch (rc) {
- .SUCCESS => {},
- .TIMEDOUT => error.TimedOut,
- else => unreachable,
- };
- }
+ fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void {
+ // Register that we're waiting on the state by incrementing the wait count.
+ // This assumes that there can be at most ((1<<16)-1) or 65,355 threads concurrently waiting on the same Condvar.
+ // If this is hit in practice, then this condvar not working is the least of your concerns.
+ var state = self.state.fetchAdd(one_waiter, .Monotonic);
+ assert(state & waiter_mask != waiter_mask);
+ state += one_waiter;
- pub fn signal(cond: *PthreadCondition) void {
- const rc = std.c.pthread_cond_signal(&cond.cond);
- assert(rc == .SUCCESS);
- }
+ // Temporarily release the mutex in order to block on the condition variable.
+ mutex.unlock();
+ defer mutex.lock();
- pub fn broadcast(cond: *PthreadCondition) void {
- const rc = std.c.pthread_cond_broadcast(&cond.cond);
- assert(rc == .SUCCESS);
- }
-};
+ var futex_deadline = Futex.Deadline.init(timeout);
+ while (true) {
+ // Try to wake up by consuming a signal and decremented the waiter we added previously.
+ // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
+ while (state & signal_mask != 0) {
+ const new_state = state - one_waiter - one_signal;
+ state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return;
+ }
-pub const AtomicCondition = struct {
- pending: bool = false,
- queue_mutex: Mutex = .{},
- queue_list: QueueList = .{},
-
- pub const QueueList = std.SinglyLinkedList(QueueItem);
-
- pub const QueueItem = struct {
- futex: i32 = 0,
- dequeued: bool = false,
-
- fn wait(cond: *@This()) void {
- while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) {
- switch (builtin.os.tag) {
- .linux => {
- switch (linux.getErrno(linux.futex_wait(
- &cond.futex,
- linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
- 0,
- null,
- ))) {
- .SUCCESS => {},
- .INTR => {},
- .AGAIN => {},
- else => unreachable,
- }
- },
- else => std.atomic.spinLoopHint(),
- }
+ // Observe the epoch, then check the state again to see if we should wake up.
+ // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
+ //
+ // - T1: s = LOAD(&state)
+ // - T2: UPDATE(&s, signal)
+ // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
+ // - T1: e = LOAD(&epoch) (was reordered after the state load)
+ // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
+ //
+ // Acquire barrier to ensure the epoch load happens before the state load.
+ const epoch = self.epoch.load(.Acquire);
+ state = self.state.load(.Monotonic);
+ if (state & signal_mask != 0) {
+ continue;
}
- }
- pub fn timedWait(cond: *@This(), timeout_ns: u64) error{TimedOut}!void {
- const start_time = std.time.nanoTimestamp();
- while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) {
- switch (builtin.os.tag) {
- .linux => {
- var ts: std.os.timespec = undefined;
- ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
- ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
- switch (linux.getErrno(linux.futex_wait(
- &cond.futex,
- linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
- 0,
- &ts,
- ))) {
- .SUCCESS => {},
- .INTR => {},
- .AGAIN => {},
- .TIMEDOUT => return error.TimedOut,
- .INVAL => {}, // possibly timeout overflow
- .FAULT => unreachable,
- else => unreachable,
- }
- },
- else => {
- if (std.time.nanoTimestamp() - start_time >= timeout_ns) {
- return error.TimedOut;
+ futex_deadline.wait(&self.epoch, epoch) catch |err| switch (err) {
+ // On timeout, we must decrement the waiter we added above.
+ error.Timeout => {
+ while (true) {
+ // If there's a signal when we're timing out, consume it and report being woken up instead.
+ // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
+ while (state & signal_mask != 0) {
+ const new_state = state - one_waiter - one_signal;
+ state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return;
}
- std.atomic.spinLoopHint();
- },
- }
- }
- }
- fn notify(cond: *@This()) void {
- @atomicStore(i32, &cond.futex, 1, .Release);
-
- switch (builtin.os.tag) {
- .linux => {
- switch (linux.getErrno(linux.futex_wake(
- &cond.futex,
- linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE,
- 1,
- ))) {
- .SUCCESS => {},
- .FAULT => {},
- else => unreachable,
+ // Remove the waiter we added and officially return timed out.
+ const new_state = state - one_waiter;
+ state = self.state.tryCompareAndSwap(state, new_state, .Monotonic, .Monotonic) orelse return err;
}
},
- else => {},
+ };
+ }
+ }
+
+ fn wake(self: *Impl, comptime notify: Notify) void {
+ var state = self.state.load(.Monotonic);
+ while (true) {
+ const waiters = (state & waiter_mask) / one_waiter;
+ const signals = (state & signal_mask) / one_signal;
+
+ // Reserves which waiters to wake up by incrementing the signals count.
+ // Therefor, the signals count is always less than or equal to the waiters count.
+ // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters.
+ const wakeable = waiters - signals;
+ if (wakeable == 0) {
+ return;
}
+
+ const to_wake = switch (notify) {
+ .one => 1,
+ .all => wakeable,
+ };
+
+ // Reserve the amount of waiters to wake by incrementing the signals count.
+ // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
+ const new_state = state + (one_signal * to_wake);
+ state = self.state.tryCompareAndSwap(state, new_state, .Release, .Monotonic) orelse {
+ // Wake up the waiting threads we reserved above by changing the epoch value.
+ // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it.
+ // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption.
+ //
+ // Release barrier ensures the signal being added to the state happens before the epoch is changed.
+ // If not, the waiting thread could potentially deadlock from missing both the state and epoch change:
+ //
+ // - T2: UPDATE(&epoch, 1) (reordered before the state change)
+ // - T1: e = LOAD(&epoch)
+ // - T1: s = LOAD(&state)
+ // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
+ // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
+ _ = self.epoch.fetchAdd(1, .Release);
+ Futex.wake(&self.epoch, to_wake);
+ return;
+ };
}
- };
+ }
+};
- pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void {
- var waiter = QueueList.Node{ .data = .{} };
+test "Condition - smoke test" {
+ var mutex = Mutex{};
+ var cond = Condition{};
- {
- cond.queue_mutex.lock();
- defer cond.queue_mutex.unlock();
+ // Try to wake outside the mutex
+ defer cond.signal();
+ defer cond.broadcast();
- cond.queue_list.prepend(&waiter);
- @atomicStore(bool, &cond.pending, true, .SeqCst);
- }
+ mutex.lock();
+ defer mutex.unlock();
- mutex.unlock();
- waiter.data.wait();
- mutex.lock();
- }
+ // Try to wait with a timeout (should not deadlock)
+ try testing.expectError(error.Timeout, cond.timedWait(&mutex, 0));
+ try testing.expectError(error.Timeout, cond.timedWait(&mutex, std.time.ns_per_ms));
- pub fn timedWait(cond: *AtomicCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void {
- var waiter = QueueList.Node{ .data = .{} };
+ // Try to wake inside the mutex.
+ cond.signal();
+ cond.broadcast();
+}
- {
- cond.queue_mutex.lock();
- defer cond.queue_mutex.unlock();
+// Inspired from: https://github.com/Amanieu/parking_lot/pull/129
+test "Condition - wait and signal" {
+ // This test requires spawning threads
+ if (builtin.single_threaded) {
+ return error.SkipZigTest;
+ }
- cond.queue_list.prepend(&waiter);
- @atomicStore(bool, &cond.pending, true, .SeqCst);
- }
+ const num_threads = 4;
- var timed_out = false;
- mutex.unlock();
- defer mutex.lock();
- waiter.data.timedWait(timeout_ns) catch |err| switch (err) {
- error.TimedOut => {
- defer if (!timed_out) {
- waiter.data.wait();
- };
- cond.queue_mutex.lock();
- defer cond.queue_mutex.unlock();
-
- if (!waiter.data.dequeued) {
- timed_out = true;
- cond.queue_list.remove(&waiter);
- }
- },
- else => unreachable,
- };
+ const MultiWait = struct {
+ mutex: Mutex = .{},
+ cond: Condition = .{},
+ threads: [num_threads]std.Thread = undefined,
- if (timed_out) {
- return error.TimedOut;
+ fn run(self: *@This()) void {
+ self.mutex.lock();
+ defer self.mutex.unlock();
+
+ self.cond.wait(&self.mutex);
+ self.cond.timedWait(&self.mutex, std.time.ns_per_ms) catch {};
+ self.cond.signal();
}
+ };
+
+ var multi_wait = MultiWait{};
+ for (multi_wait.threads) |*t| {
+ t.* = try std.Thread.spawn(.{}, MultiWait.run, .{&multi_wait});
}
- pub fn signal(cond: *AtomicCondition) void {
- if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
- return;
+ std.time.sleep(100 * std.time.ns_per_ms);
- const maybe_waiter = blk: {
- cond.queue_mutex.lock();
- defer cond.queue_mutex.unlock();
+ multi_wait.cond.signal();
+ for (multi_wait.threads) |t| {
+ t.join();
+ }
+}
- const maybe_waiter = cond.queue_list.popFirst();
- if (maybe_waiter) |waiter| {
- waiter.data.dequeued = true;
+test "Condition - signal" {
+ // This test requires spawning threads
+ if (builtin.single_threaded) {
+ return error.SkipZigTest;
+ }
+
+ const num_threads = 4;
+
+ const SignalTest = struct {
+ mutex: Mutex = .{},
+ cond: Condition = .{},
+ notified: bool = false,
+ threads: [num_threads]std.Thread = undefined,
+
+ fn run(self: *@This()) void {
+ self.mutex.lock();
+ defer self.mutex.unlock();
+
+ // Use timedWait() a few times before using wait()
+ // to test multiple threads timing out frequently.
+ var i: usize = 0;
+ while (!self.notified) : (i +%= 1) {
+ if (i < 5) {
+ self.cond.timedWait(&self.mutex, 1) catch {};
+ } else {
+ self.cond.wait(&self.mutex);
+ }
}
- @atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst);
- break :blk maybe_waiter;
- };
- if (maybe_waiter) |waiter| {
- waiter.data.notify();
+ // Once we received the signal, notify another thread (inside the lock).
+ assert(self.notified);
+ self.cond.signal();
}
- }
-
- pub fn broadcast(cond: *AtomicCondition) void {
- if (@atomicLoad(bool, &cond.pending, .SeqCst) == false)
- return;
+ };
- @atomicStore(bool, &cond.pending, false, .SeqCst);
+ var signal_test = SignalTest{};
+ for (signal_test.threads) |*t| {
+ t.* = try std.Thread.spawn(.{}, SignalTest.run, .{&signal_test});
+ }
- var waiters = blk: {
- cond.queue_mutex.lock();
- defer cond.queue_mutex.unlock();
+ {
+ // Wait for a bit in hopes that the spawned threads start queuing up on the condvar
+ std.time.sleep(10 * std.time.ns_per_ms);
- const waiters = cond.queue_list;
+ // Wake up one of them (outside the lock) after setting notified=true.
+ defer signal_test.cond.signal();
- var it = waiters.first;
- while (it) |node| : (it = node.next) {
- node.data.dequeued = true;
- }
+ signal_test.mutex.lock();
+ defer signal_test.mutex.unlock();
- cond.queue_list = .{};
- break :blk waiters;
- };
+ try testing.expect(!signal_test.notified);
+ signal_test.notified = true;
+ }
- while (waiters.popFirst()) |waiter| {
- waiter.data.notify();
- }
+ for (signal_test.threads) |t| {
+ t.join();
}
-};
+}
-test "Thread.Condition" {
+test "Condition - multi signal" {
+ // This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
- const TestContext = struct {
- cond: *Condition,
- cond_main: *Condition,
- mutex: *Mutex,
- n: *i32,
- fn worker(ctx: *@This()) void {
- ctx.mutex.lock();
- ctx.n.* += 1;
- ctx.cond_main.signal();
- ctx.cond.wait(ctx.mutex);
- ctx.n.* -= 1;
- ctx.cond_main.signal();
- ctx.mutex.unlock();
+ const num_threads = 4;
+ const num_iterations = 4;
+
+ const Paddle = struct {
+ mutex: Mutex = .{},
+ cond: Condition = .{},
+ value: u32 = 0,
+
+ fn hit(self: *@This()) void {
+ defer self.cond.signal();
+
+ self.mutex.lock();
+ defer self.mutex.unlock();
+
+ self.value += 1;
}
- };
- const num_threads = 3;
- var threads: [num_threads]std.Thread = undefined;
- var cond = Condition{};
- var cond_main = Condition{};
- var mut = Mutex{};
- var n: i32 = 0;
- var ctx = TestContext{ .cond = &cond, .cond_main = &cond_main, .mutex = &mut, .n = &n };
- mut.lock();
- for (threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx});
- cond_main.wait(&mut);
- while (n < num_threads) cond_main.wait(&mut);
+ fn run(self: *@This(), hit_to: *@This()) !void {
+ self.mutex.lock();
+ defer self.mutex.unlock();
- cond.signal();
- cond_main.wait(&mut);
- try testing.expect(n == (num_threads - 1));
+ var current: u32 = 0;
+ while (current < num_iterations) : (current += 1) {
+ // Wait for the value to change from hit()
+ while (self.value == current) {
+ self.cond.wait(&self.mutex);
+ }
- cond.broadcast();
- while (n > 0) cond_main.wait(&mut);
- try testing.expect(n == 0);
+ // hit the next paddle
+ try testing.expectEqual(self.value, current + 1);
+ hit_to.hit();
+ }
+ }
+ };
+
+ var paddles = [_]Paddle{.{}} ** num_threads;
+ var threads = [_]std.Thread{undefined} ** num_threads;
+
+ // Create a circle of paddles which hit each other
+ for (threads) |*t, i| {
+ const paddle = &paddles[i];
+ const hit_to = &paddles[(i + 1) % paddles.len];
+ t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to });
+ }
+ // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations.
+ paddles[0].hit();
for (threads) |t| t.join();
+
+ // The first paddle will be hit one last time by the last paddle.
+ for (paddles) |p, i| {
+ const expected = @as(u32, num_iterations) + @boolToInt(i == 0);
+ try testing.expectEqual(p.value, expected);
+ }
}
-test "Thread.Condition.timedWait" {
+test "Condition - broadcasting" {
+ // This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
- var cond = Condition{};
- var mut = Mutex{};
+ const num_threads = 10;
- // Expect a timeout, as the condition variable is never signaled
- {
- mut.lock();
- defer mut.unlock();
- try testing.expectError(error.TimedOut, cond.timedWait(&mut, 10 * std.time.ns_per_ms));
+ const BroadcastTest = struct {
+ mutex: Mutex = .{},
+ cond: Condition = .{},
+ completed: Condition = .{},
+ count: usize = 0,
+ threads: [num_threads]std.Thread = undefined,
+
+ fn run(self: *@This()) void {
+ self.mutex.lock();
+ defer self.mutex.unlock();
+
+ // The last broadcast thread to start tells the main test thread it's completed.
+ self.count += 1;
+ if (self.count == num_threads) {
+ self.completed.signal();
+ }
+
+ // Waits for the count to reach zero after the main test thread observes it at num_threads.
+ // Tries to use timedWait() a bit before falling back to wait() to test multiple threads timing out.
+ var i: usize = 0;
+ while (self.count != 0) : (i +%= 1) {
+ if (i < 10) {
+ self.cond.timedWait(&self.mutex, 1) catch {};
+ } else {
+ self.cond.wait(&self.mutex);
+ }
+ }
+ }
+ };
+
+ var broadcast_test = BroadcastTest{};
+ for (broadcast_test.threads) |*t| {
+ t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{&broadcast_test});
}
- // Expect a signal before timeout
{
- const TestContext = struct {
- cond: *Condition,
- mutex: *Mutex,
- n: *u32,
- fn worker(ctx: *@This()) void {
- ctx.mutex.lock();
- defer ctx.mutex.unlock();
- ctx.n.* = 1;
- ctx.cond.signal();
- }
- };
+ broadcast_test.mutex.lock();
+ defer broadcast_test.mutex.unlock();
+
+ // Wait for all the broadcast threads to spawn.
+ // timedWait() to detect any potential deadlocks.
+ while (broadcast_test.count != num_threads) {
+ try broadcast_test.completed.timedWait(
+ &broadcast_test.mutex,
+ 1 * std.time.ns_per_s,
+ );
+ }
- var n: u32 = 0;
+ // Reset the counter and wake all the threads to exit.
+ broadcast_test.count = 0;
+ broadcast_test.cond.broadcast();
+ }
- var ctx = TestContext{ .cond = &cond, .mutex = &mut, .n = &n };
- mut.lock();
- var thread = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx});
- // Looped check to handle spurious wakeups
- while (n != 1) try cond.timedWait(&mut, 500 * std.time.ns_per_ms);
- mut.unlock();
- try testing.expect(n == 1);
- thread.join();
+ for (broadcast_test.threads) |t| {
+ t.join();
}
}