diff options
| author | protty <45520026+kprotty@users.noreply.github.com> | 2022-04-23 19:35:56 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-04-23 19:35:56 -0500 |
| commit | 963ac60918b39cafdda3cb99eff4cd9d20edd839 (patch) | |
| tree | 8b480e6c1c01c11758f47b1126c53e75a339d1a1 /lib/std/Thread/Condition.zig | |
| parent | daef82d06fd6b30d2cab7f5a6723cf2e3c7b48c6 (diff) | |
| download | zig-963ac60918b39cafdda3cb99eff4cd9d20edd839.tar.gz zig-963ac60918b39cafdda3cb99eff4cd9d20edd839.zip | |
std.Thread: Mutex and Condition improvements (#11497)
* Thread: minor cleanups
* Thread: rewrite Mutex
* Thread: introduce Futex.Deadline
* Thread: Condition rewrite + cleanup
* Mutex: optimize lock fast path
* Condition: more docs
* Thread: more mutex + condition docs
* Thread: remove broken Condition test
* Thread: zig fmt
* address review comments + fix Thread.DummyMutex in GPA
* Atomic: disable bitRmw x86 inline asm for stage2
* GPA: typo mutex_init
* Thread: remove noalias on stuff
* Thread: comment typos + clarifications
Diffstat (limited to 'lib/std/Thread/Condition.zig')
| -rw-r--r-- | lib/std/Thread/Condition.zig | 743 |
1 files changed, 435 insertions, 308 deletions
diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig index fb48db8e53..d5855ec066 100644 --- a/lib/std/Thread/Condition.zig +++ b/lib/std/Thread/Condition.zig @@ -1,411 +1,538 @@ -//! A condition provides a way for a kernel thread to block until it is signaled -//! to wake up. Spurious wakeups are possible. -//! This API supports static initialization and does not require deinitialization. - -impl: Impl = .{}, +//! Condition variables are used with a Mutex to efficiently wait for an arbitrary condition to occur. +//! It does this by atomically unlocking the mutex, blocking the thread until notified, and finally re-locking the mutex. +//! Condition can be statically initialized and is at most `@sizeOf(u64)` large. +//! +//! Example: +//! ``` +//! var m = Mutex{}; +//! var c = Condition{}; +//! var predicate = false; +//! +//! fn consumer() void { +//! m.lock(); +//! defer m.unlock(); +//! +//! while (!predicate) { +//! c.wait(&mutex); +//! } +//! } +//! +//! fn producer() void { +//! m.lock(); +//! defer m.unlock(); +//! +//! predicate = true; +//! c.signal(); +//! } +//! +//! const thread = try std.Thread.spawn(.{}, producer, .{}); +//! consumer(); +//! thread.join(); +//! ``` +//! +//! Note that condition variables can only reliably unblock threads that are sequenced before them using the same Mutex. +//! This means that the following is allowed to deadlock: +//! ``` +//! thread-1: mutex.lock() +//! thread-1: condition.wait(&mutex) +//! +//! thread-2: // mutex.lock() (without this, the following signal may not see the waiting thread-1) +//! thread-2: // mutex.unlock() (this is optional for correctness once locked above, as signal can be called without holding the mutex) +//! thread-2: condition.signal() +//! ``` const std = @import("../std.zig"); const builtin = @import("builtin"); const Condition = @This(); -const windows = std.os.windows; -const linux = std.os.linux; const Mutex = std.Thread.Mutex; + +const os = std.os; const assert = std.debug.assert; const testing = std.testing; +const Atomic = std.atomic.Atomic; +const Futex = std.Thread.Futex; + +impl: Impl = .{}, -pub fn wait(cond: *Condition, mutex: *Mutex) void { - cond.impl.wait(mutex); +/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return. +/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex. +/// +/// The Mutex must be locked by the caller's thread when this function is called. +/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite. +/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently. +/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex. +/// +/// A blocking call to wait() is unblocked from one of the following conditions: +/// - a spurious ("at random") wake up occurs +/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `wait()`. +/// +/// Given wait() can be interrupted spuriously, the blocking condition should be checked continuously +/// irrespective of any notifications from `signal()` or `broadcast()`. +pub fn wait(self: *Condition, mutex: *Mutex) void { + self.impl.wait(mutex, null) catch |err| switch (err) { + error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out + }; } -pub fn timedWait(cond: *Condition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - try cond.impl.timedWait(mutex, timeout_ns); +/// Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return. +/// "Atomically" here refers to accesses done on the Condition after acquiring the Mutex. +/// +/// The Mutex must be locked by the caller's thread when this function is called. +/// A Mutex can have multiple Conditions waiting with it concurrently, but not the opposite. +/// It is undefined behavior for multiple threads to wait ith different mutexes using the same Condition concurrently. +/// Once threads have finished waiting with one Mutex, the Condition can be used to wait with another Mutex. +/// +/// A blocking call to `timedWait()` is unblocked from one of the following conditions: +/// - a spurious ("at random") wake occurs +/// - the caller was blocked for around `timeout_ns` nanoseconds, in which `error.Timeout` is returned. +/// - a future call to `signal()` or `broadcast()` which has acquired the Mutex and is sequenced after this `timedWait()`. +/// +/// Given `timedWait()` can be interrupted spuriously, the blocking condition should be checked continuously +/// irrespective of any notifications from `signal()` or `broadcast()`. +pub fn timedWait(self: *Condition, mutex: *Mutex, timeout_ns: u64) error{Timeout}!void { + return self.impl.wait(mutex, timeout_ns); } -pub fn signal(cond: *Condition) void { - cond.impl.signal(); +/// Unblocks at least one thread blocked in a call to `wait()` or `timedWait()` with a given Mutex. +/// The blocked thread must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking. +/// `signal()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads. +pub fn signal(self: *Condition) void { + self.impl.wake(.one); } -pub fn broadcast(cond: *Condition) void { - cond.impl.broadcast(); +/// Unblocks all threads currently blocked in a call to `wait()` or `timedWait()` with a given Mutex. +/// The blocked threads must be sequenced before this call with respect to acquiring the same Mutex in order to be observable for unblocking. +/// `broadcast()` can be called with or without the relevant Mutex being acquired and have no "effect" if there's no observable blocked threads. +pub fn broadcast(self: *Condition) void { + self.impl.wake(.all); } const Impl = if (builtin.single_threaded) - SingleThreadedCondition + SingleThreadedImpl else if (builtin.os.tag == .windows) - WindowsCondition -else if (std.Thread.use_pthreads) - PthreadCondition + WindowsImpl else - AtomicCondition; + FutexImpl; -pub const SingleThreadedCondition = struct { - pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void { - _ = cond; - _ = mutex; - unreachable; // deadlock detected - } +const Notify = enum { + one, // wake up only one thread + all, // wake up all threads +}; - pub fn timedWait(cond: *SingleThreadedCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - _ = cond; +const SingleThreadedImpl = struct { + fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { + _ = self; _ = mutex; - _ = timeout_ns; - std.time.sleep(timeout_ns); - return error.TimedOut; - } - pub fn signal(cond: *SingleThreadedCondition) void { - _ = cond; + // There are no other threads to wake us up. + // So if we wait without a timeout we would never wake up. + const timeout_ns = timeout orelse { + unreachable; // deadlock detected + }; + + std.time.sleep(timeout_ns); + return error.Timeout; } - pub fn broadcast(cond: *SingleThreadedCondition) void { - _ = cond; + fn wake(self: *Impl, comptime notify: Notify) void { + // There are no other threads to wake up. + _ = self; + _ = notify; } }; -pub const WindowsCondition = struct { - cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT, +const WindowsImpl = struct { + condition: os.windows.CONDITION_VARIABLE = .{}, - pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void { - const rc = windows.kernel32.SleepConditionVariableSRW( - &cond.cond, - &mutex.impl.srwlock, - windows.INFINITE, - @as(windows.ULONG, 0), - ); - assert(rc != windows.FALSE); - } + fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { + var timeout_overflowed = false; + var timeout_ms: os.windows.DWORD = os.windows.INFINITE; - pub fn timedWait(cond: *WindowsCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - var timeout_checked = std.math.cast(windows.DWORD, timeout_ns / std.time.ns_per_ms) catch overflow: { - break :overflow std.math.maxInt(windows.DWORD); - }; + if (timeout) |timeout_ns| { + // Round the nanoseconds to the nearest millisecond, + // then saturating cast it to windows DWORD for use in kernel32 call. + const ms = (timeout_ns +| (std.time.ns_per_ms / 2)) / std.time.ns_per_ms; + timeout_ms = std.math.cast(os.windows.DWORD, ms) catch std.math.maxInt(os.windows.DWORD); - // Handle the case where timeout is INFINITE, otherwise SleepConditionVariableSRW's time-out never elapses - const timeout_overflowed = timeout_checked == windows.INFINITE; - timeout_checked -= @boolToInt(timeout_overflowed); + // Track if the timeout overflowed into INFINITE and make sure not to wait forever. + if (timeout_ms == os.windows.INFINITE) { + timeout_overflowed = true; + timeout_ms -= 1; + } + } - const rc = windows.kernel32.SleepConditionVariableSRW( - &cond.cond, + const rc = os.windows.kernel32.SleepConditionVariableSRW( + &self.condition, &mutex.impl.srwlock, - timeout_checked, - @as(windows.ULONG, 0), + timeout_ms, + 0, // the srwlock was assumed to acquired in exclusive mode not shared ); - if (rc == windows.FALSE and windows.kernel32.GetLastError() == windows.Win32Error.TIMEOUT) return error.TimedOut; - assert(rc != windows.FALSE); - } - pub fn signal(cond: *WindowsCondition) void { - windows.kernel32.WakeConditionVariable(&cond.cond); + // Return error.Timeout if we know the timeout elapsed correctly. + if (rc == os.windows.FALSE) { + assert(os.windows.kernel32.GetLastError() == .TIMEOUT); + if (!timeout_overflowed) return error.Timeout; + } } - pub fn broadcast(cond: *WindowsCondition) void { - windows.kernel32.WakeAllConditionVariable(&cond.cond); + fn wake(self: *Impl, comptime notify: Notify) void { + switch (notify) { + .one => os.windows.kernel32.WakeConditionVariable(&self.condition), + .all => os.windows.kernel32.WakeAllConditionVariable(&self.condition), + } } }; -pub const PthreadCondition = struct { - cond: std.c.pthread_cond_t = .{}, +const FutexImpl = struct { + state: Atomic(u32) = Atomic(u32).init(0), + epoch: Atomic(u32) = Atomic(u32).init(0), - pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void { - const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.impl.pthread_mutex); - assert(rc == .SUCCESS); - } + const one_waiter = 1; + const waiter_mask = 0xffff; - pub fn timedWait(cond: *PthreadCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - var ts: std.os.timespec = undefined; - std.os.clock_gettime(std.os.CLOCK.REALTIME, &ts) catch unreachable; - ts.tv_sec += @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); - ts.tv_nsec += @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); - if (ts.tv_nsec >= std.time.ns_per_s) { - ts.tv_sec += 1; - ts.tv_nsec -= std.time.ns_per_s; - } + const one_signal = 1 << 16; + const signal_mask = 0xffff << 16; - const rc = std.c.pthread_cond_timedwait(&cond.cond, &mutex.impl.pthread_mutex, &ts); - return switch (rc) { - .SUCCESS => {}, - .TIMEDOUT => error.TimedOut, - else => unreachable, - }; - } + fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { + // Register that we're waiting on the state by incrementing the wait count. + // This assumes that there can be at most ((1<<16)-1) or 65,355 threads concurrently waiting on the same Condvar. + // If this is hit in practice, then this condvar not working is the least of your concerns. + var state = self.state.fetchAdd(one_waiter, .Monotonic); + assert(state & waiter_mask != waiter_mask); + state += one_waiter; - pub fn signal(cond: *PthreadCondition) void { - const rc = std.c.pthread_cond_signal(&cond.cond); - assert(rc == .SUCCESS); - } + // Temporarily release the mutex in order to block on the condition variable. + mutex.unlock(); + defer mutex.lock(); - pub fn broadcast(cond: *PthreadCondition) void { - const rc = std.c.pthread_cond_broadcast(&cond.cond); - assert(rc == .SUCCESS); - } -}; + var futex_deadline = Futex.Deadline.init(timeout); + while (true) { + // Try to wake up by consuming a signal and decremented the waiter we added previously. + // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return; + } -pub const AtomicCondition = struct { - pending: bool = false, - queue_mutex: Mutex = .{}, - queue_list: QueueList = .{}, - - pub const QueueList = std.SinglyLinkedList(QueueItem); - - pub const QueueItem = struct { - futex: i32 = 0, - dequeued: bool = false, - - fn wait(cond: *@This()) void { - while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) { - switch (builtin.os.tag) { - .linux => { - switch (linux.getErrno(linux.futex_wait( - &cond.futex, - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, - 0, - null, - ))) { - .SUCCESS => {}, - .INTR => {}, - .AGAIN => {}, - else => unreachable, - } - }, - else => std.atomic.spinLoopHint(), - } + // Observe the epoch, then check the state again to see if we should wake up. + // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock: + // + // - T1: s = LOAD(&state) + // - T2: UPDATE(&s, signal) + // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch) + // - T1: e = LOAD(&epoch) (was reordered after the state load) + // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change) + // + // Acquire barrier to ensure the epoch load happens before the state load. + const epoch = self.epoch.load(.Acquire); + state = self.state.load(.Monotonic); + if (state & signal_mask != 0) { + continue; } - } - pub fn timedWait(cond: *@This(), timeout_ns: u64) error{TimedOut}!void { - const start_time = std.time.nanoTimestamp(); - while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) { - switch (builtin.os.tag) { - .linux => { - var ts: std.os.timespec = undefined; - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s); - switch (linux.getErrno(linux.futex_wait( - &cond.futex, - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, - 0, - &ts, - ))) { - .SUCCESS => {}, - .INTR => {}, - .AGAIN => {}, - .TIMEDOUT => return error.TimedOut, - .INVAL => {}, // possibly timeout overflow - .FAULT => unreachable, - else => unreachable, - } - }, - else => { - if (std.time.nanoTimestamp() - start_time >= timeout_ns) { - return error.TimedOut; + futex_deadline.wait(&self.epoch, epoch) catch |err| switch (err) { + // On timeout, we must decrement the waiter we added above. + error.Timeout => { + while (true) { + // If there's a signal when we're timing out, consume it and report being woken up instead. + // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. + while (state & signal_mask != 0) { + const new_state = state - one_waiter - one_signal; + state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return; } - std.atomic.spinLoopHint(); - }, - } - } - } - fn notify(cond: *@This()) void { - @atomicStore(i32, &cond.futex, 1, .Release); - - switch (builtin.os.tag) { - .linux => { - switch (linux.getErrno(linux.futex_wake( - &cond.futex, - linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, - 1, - ))) { - .SUCCESS => {}, - .FAULT => {}, - else => unreachable, + // Remove the waiter we added and officially return timed out. + const new_state = state - one_waiter; + state = self.state.tryCompareAndSwap(state, new_state, .Monotonic, .Monotonic) orelse return err; } }, - else => {}, + }; + } + } + + fn wake(self: *Impl, comptime notify: Notify) void { + var state = self.state.load(.Monotonic); + while (true) { + const waiters = (state & waiter_mask) / one_waiter; + const signals = (state & signal_mask) / one_signal; + + // Reserves which waiters to wake up by incrementing the signals count. + // Therefor, the signals count is always less than or equal to the waiters count. + // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters. + const wakeable = waiters - signals; + if (wakeable == 0) { + return; } + + const to_wake = switch (notify) { + .one => 1, + .all => wakeable, + }; + + // Reserve the amount of waiters to wake by incrementing the signals count. + // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads. + const new_state = state + (one_signal * to_wake); + state = self.state.tryCompareAndSwap(state, new_state, .Release, .Monotonic) orelse { + // Wake up the waiting threads we reserved above by changing the epoch value. + // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it. + // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption. + // + // Release barrier ensures the signal being added to the state happens before the epoch is changed. + // If not, the waiting thread could potentially deadlock from missing both the state and epoch change: + // + // - T2: UPDATE(&epoch, 1) (reordered before the state change) + // - T1: e = LOAD(&epoch) + // - T1: s = LOAD(&state) + // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) + // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) + _ = self.epoch.fetchAdd(1, .Release); + Futex.wake(&self.epoch, to_wake); + return; + }; } - }; + } +}; - pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void { - var waiter = QueueList.Node{ .data = .{} }; +test "Condition - smoke test" { + var mutex = Mutex{}; + var cond = Condition{}; - { - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); + // Try to wake outside the mutex + defer cond.signal(); + defer cond.broadcast(); - cond.queue_list.prepend(&waiter); - @atomicStore(bool, &cond.pending, true, .SeqCst); - } + mutex.lock(); + defer mutex.unlock(); - mutex.unlock(); - waiter.data.wait(); - mutex.lock(); - } + // Try to wait with a timeout (should not deadlock) + try testing.expectError(error.Timeout, cond.timedWait(&mutex, 0)); + try testing.expectError(error.Timeout, cond.timedWait(&mutex, std.time.ns_per_ms)); - pub fn timedWait(cond: *AtomicCondition, mutex: *Mutex, timeout_ns: u64) error{TimedOut}!void { - var waiter = QueueList.Node{ .data = .{} }; + // Try to wake inside the mutex. + cond.signal(); + cond.broadcast(); +} - { - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); +// Inspired from: https://github.com/Amanieu/parking_lot/pull/129 +test "Condition - wait and signal" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } - cond.queue_list.prepend(&waiter); - @atomicStore(bool, &cond.pending, true, .SeqCst); - } + const num_threads = 4; - var timed_out = false; - mutex.unlock(); - defer mutex.lock(); - waiter.data.timedWait(timeout_ns) catch |err| switch (err) { - error.TimedOut => { - defer if (!timed_out) { - waiter.data.wait(); - }; - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); - - if (!waiter.data.dequeued) { - timed_out = true; - cond.queue_list.remove(&waiter); - } - }, - else => unreachable, - }; + const MultiWait = struct { + mutex: Mutex = .{}, + cond: Condition = .{}, + threads: [num_threads]std.Thread = undefined, - if (timed_out) { - return error.TimedOut; + fn run(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.cond.wait(&self.mutex); + self.cond.timedWait(&self.mutex, std.time.ns_per_ms) catch {}; + self.cond.signal(); } + }; + + var multi_wait = MultiWait{}; + for (multi_wait.threads) |*t| { + t.* = try std.Thread.spawn(.{}, MultiWait.run, .{&multi_wait}); } - pub fn signal(cond: *AtomicCondition) void { - if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) - return; + std.time.sleep(100 * std.time.ns_per_ms); - const maybe_waiter = blk: { - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); + multi_wait.cond.signal(); + for (multi_wait.threads) |t| { + t.join(); + } +} - const maybe_waiter = cond.queue_list.popFirst(); - if (maybe_waiter) |waiter| { - waiter.data.dequeued = true; +test "Condition - signal" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const num_threads = 4; + + const SignalTest = struct { + mutex: Mutex = .{}, + cond: Condition = .{}, + notified: bool = false, + threads: [num_threads]std.Thread = undefined, + + fn run(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + // Use timedWait() a few times before using wait() + // to test multiple threads timing out frequently. + var i: usize = 0; + while (!self.notified) : (i +%= 1) { + if (i < 5) { + self.cond.timedWait(&self.mutex, 1) catch {}; + } else { + self.cond.wait(&self.mutex); + } } - @atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst); - break :blk maybe_waiter; - }; - if (maybe_waiter) |waiter| { - waiter.data.notify(); + // Once we received the signal, notify another thread (inside the lock). + assert(self.notified); + self.cond.signal(); } - } - - pub fn broadcast(cond: *AtomicCondition) void { - if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) - return; + }; - @atomicStore(bool, &cond.pending, false, .SeqCst); + var signal_test = SignalTest{}; + for (signal_test.threads) |*t| { + t.* = try std.Thread.spawn(.{}, SignalTest.run, .{&signal_test}); + } - var waiters = blk: { - cond.queue_mutex.lock(); - defer cond.queue_mutex.unlock(); + { + // Wait for a bit in hopes that the spawned threads start queuing up on the condvar + std.time.sleep(10 * std.time.ns_per_ms); - const waiters = cond.queue_list; + // Wake up one of them (outside the lock) after setting notified=true. + defer signal_test.cond.signal(); - var it = waiters.first; - while (it) |node| : (it = node.next) { - node.data.dequeued = true; - } + signal_test.mutex.lock(); + defer signal_test.mutex.unlock(); - cond.queue_list = .{}; - break :blk waiters; - }; + try testing.expect(!signal_test.notified); + signal_test.notified = true; + } - while (waiters.popFirst()) |waiter| { - waiter.data.notify(); - } + for (signal_test.threads) |t| { + t.join(); } -}; +} -test "Thread.Condition" { +test "Condition - multi signal" { + // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } - const TestContext = struct { - cond: *Condition, - cond_main: *Condition, - mutex: *Mutex, - n: *i32, - fn worker(ctx: *@This()) void { - ctx.mutex.lock(); - ctx.n.* += 1; - ctx.cond_main.signal(); - ctx.cond.wait(ctx.mutex); - ctx.n.* -= 1; - ctx.cond_main.signal(); - ctx.mutex.unlock(); + const num_threads = 4; + const num_iterations = 4; + + const Paddle = struct { + mutex: Mutex = .{}, + cond: Condition = .{}, + value: u32 = 0, + + fn hit(self: *@This()) void { + defer self.cond.signal(); + + self.mutex.lock(); + defer self.mutex.unlock(); + + self.value += 1; } - }; - const num_threads = 3; - var threads: [num_threads]std.Thread = undefined; - var cond = Condition{}; - var cond_main = Condition{}; - var mut = Mutex{}; - var n: i32 = 0; - var ctx = TestContext{ .cond = &cond, .cond_main = &cond_main, .mutex = &mut, .n = &n }; - mut.lock(); - for (threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx}); - cond_main.wait(&mut); - while (n < num_threads) cond_main.wait(&mut); + fn run(self: *@This(), hit_to: *@This()) !void { + self.mutex.lock(); + defer self.mutex.unlock(); - cond.signal(); - cond_main.wait(&mut); - try testing.expect(n == (num_threads - 1)); + var current: u32 = 0; + while (current < num_iterations) : (current += 1) { + // Wait for the value to change from hit() + while (self.value == current) { + self.cond.wait(&self.mutex); + } - cond.broadcast(); - while (n > 0) cond_main.wait(&mut); - try testing.expect(n == 0); + // hit the next paddle + try testing.expectEqual(self.value, current + 1); + hit_to.hit(); + } + } + }; + + var paddles = [_]Paddle{.{}} ** num_threads; + var threads = [_]std.Thread{undefined} ** num_threads; + + // Create a circle of paddles which hit each other + for (threads) |*t, i| { + const paddle = &paddles[i]; + const hit_to = &paddles[(i + 1) % paddles.len]; + t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to }); + } + // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations. + paddles[0].hit(); for (threads) |t| t.join(); + + // The first paddle will be hit one last time by the last paddle. + for (paddles) |p, i| { + const expected = @as(u32, num_iterations) + @boolToInt(i == 0); + try testing.expectEqual(p.value, expected); + } } -test "Thread.Condition.timedWait" { +test "Condition - broadcasting" { + // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } - var cond = Condition{}; - var mut = Mutex{}; + const num_threads = 10; - // Expect a timeout, as the condition variable is never signaled - { - mut.lock(); - defer mut.unlock(); - try testing.expectError(error.TimedOut, cond.timedWait(&mut, 10 * std.time.ns_per_ms)); + const BroadcastTest = struct { + mutex: Mutex = .{}, + cond: Condition = .{}, + completed: Condition = .{}, + count: usize = 0, + threads: [num_threads]std.Thread = undefined, + + fn run(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + + // The last broadcast thread to start tells the main test thread it's completed. + self.count += 1; + if (self.count == num_threads) { + self.completed.signal(); + } + + // Waits for the count to reach zero after the main test thread observes it at num_threads. + // Tries to use timedWait() a bit before falling back to wait() to test multiple threads timing out. + var i: usize = 0; + while (self.count != 0) : (i +%= 1) { + if (i < 10) { + self.cond.timedWait(&self.mutex, 1) catch {}; + } else { + self.cond.wait(&self.mutex); + } + } + } + }; + + var broadcast_test = BroadcastTest{}; + for (broadcast_test.threads) |*t| { + t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{&broadcast_test}); } - // Expect a signal before timeout { - const TestContext = struct { - cond: *Condition, - mutex: *Mutex, - n: *u32, - fn worker(ctx: *@This()) void { - ctx.mutex.lock(); - defer ctx.mutex.unlock(); - ctx.n.* = 1; - ctx.cond.signal(); - } - }; + broadcast_test.mutex.lock(); + defer broadcast_test.mutex.unlock(); + + // Wait for all the broadcast threads to spawn. + // timedWait() to detect any potential deadlocks. + while (broadcast_test.count != num_threads) { + try broadcast_test.completed.timedWait( + &broadcast_test.mutex, + 1 * std.time.ns_per_s, + ); + } - var n: u32 = 0; + // Reset the counter and wake all the threads to exit. + broadcast_test.count = 0; + broadcast_test.cond.broadcast(); + } - var ctx = TestContext{ .cond = &cond, .mutex = &mut, .n = &n }; - mut.lock(); - var thread = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx}); - // Looped check to handle spurious wakeups - while (n != 1) try cond.timedWait(&mut, 500 * std.time.ns_per_ms); - mut.unlock(); - try testing.expect(n == 1); - thread.join(); + for (broadcast_test.threads) |t| { + t.join(); } } |
