From 5065830aa007c374c382be9e80ba924df6cecc78 Mon Sep 17 00:00:00 2001 From: Cody Tapscott Date: Thu, 3 Feb 2022 15:27:01 -0700 Subject: Avoid depending on child process execution when not supported by host OS In accordance with the requesting issue (#10750): - `zig test` skips any tests that it cannot spawn, returning success - `zig run` and `zig build` exit with failure, reporting the command the cannot be run - `zig clang`, `zig ar`, etc. already punt directly to the appropriate clang/lld main(), even before this change - Native `libc` Detection is not supported Additionally, `exec()` and related Builder functions error at run-time, reporting the command that cannot be run --- src/ThreadPool.zig | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/ThreadPool.zig') diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 4f9d8dc015..813d67db66 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -82,6 +82,9 @@ pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void { } fn destroyWorkers(self: *ThreadPool, spawned: usize) void { + if (builtin.single_threaded) + return; + for (self.workers[0..spawned]) |*worker| { worker.thread.join(); worker.idle_node.data.deinit(); -- cgit v1.2.3 From 2ee3cc453c4cefa3519f6a6238d4721364d829ae Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 11 Mar 2022 17:09:23 -0700 Subject: stage2: remove SPDX license header comments These were missed in d29871977f97b50fe5e3f16cd9c68ebeba02a562. --- src/ThreadPool.zig | 5 ----- src/WaitGroup.zig | 5 ----- src/translate_c/ast.zig | 5 ----- 3 files changed, 15 deletions(-) (limited to 'src/ThreadPool.zig') diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 813d67db66..ac95def319 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -1,8 +1,3 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2020 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. const std = @import("std"); const builtin = @import("builtin"); const ThreadPool = @This(); diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index d621bfcb18..e4126b1ab3 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -1,8 +1,3 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2020 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. const std = @import("std"); const WaitGroup = @This(); diff --git a/src/translate_c/ast.zig b/src/translate_c/ast.zig index 96de020b0c..6351f67214 100644 --- a/src/translate_c/ast.zig +++ b/src/translate_c/ast.zig @@ -1,8 +1,3 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. const std = @import("std"); const Type = @import("../type.zig").Type; const Allocator = std.mem.Allocator; -- cgit v1.2.3 From 9213aa789b4b44711f2747e5ab053a2b1f57a15b Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 5 Apr 2022 23:13:39 -0700 Subject: stage2: ThreadPool: update to new function pointer semantics --- src/ThreadPool.zig | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'src/ThreadPool.zig') diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index ac95def319..36d004cfc6 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -12,7 +12,12 @@ idle_queue: IdleQueue = .{}, const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent); const RunQueue = std.SinglyLinkedList(Runnable); const Runnable = struct { - runFn: fn (*Runnable) void, + runFn: RunProto, +}; + +const RunProto = switch (builtin.zig_backend) { + .stage1 => fn (*Runnable) void, + else => *const fn (*Runnable) void, }; const Worker = struct { -- cgit v1.2.3 From 18f30346291bd2471e07924af161de080935dd60 Mon Sep 17 00:00:00 2001 From: protty <45520026+kprotty@users.noreply.github.com> Date: Tue, 26 Apr 2022 16:48:56 -0500 Subject: std.Thread: ResetEvent improvements (#11523) * std: start removing redundant ResetEvents * src: fix other uses of std.Thread.ResetEvent * src: add builtin.sanitize_thread for tsan detection * atomic: add Atomic.fence for proper fencing with tsan * Thread: remove the other ResetEvent's and rewrite the current one * Thread: ResetEvent docs * zig fmt + WaitGroup.reset() fix * src: fix build issues for ResetEvent + tsan * Thread: ResetEvent tests * Thread: ResetEvent module doc * Atomic: replace llvm *p memory constraint with *m * panicking: handle spurious wakeups in futex.wait() when waiting for abort() * zig fmt --- CMakeLists.txt | 2 - lib/std/Thread.zig | 32 +-- lib/std/Thread/AutoResetEvent.zig | 222 ------------------- lib/std/Thread/Futex.zig | 2 +- lib/std/Thread/ResetEvent.zig | 420 ++++++++++++++++++------------------ lib/std/Thread/StaticResetEvent.zig | 395 --------------------------------- lib/std/atomic/Atomic.zig | 80 +++++-- lib/std/debug.zig | 10 +- lib/std/event/loop.zig | 33 +-- lib/std/fs/test.zig | 37 ++-- src/Compilation.zig | 17 +- src/ThreadPool.zig | 139 ++++++------ src/WaitGroup.zig | 49 ++--- src/crash_report.zig | 11 +- src/stage1/codegen.cpp | 1 + 15 files changed, 417 insertions(+), 1033 deletions(-) delete mode 100644 lib/std/Thread/AutoResetEvent.zig delete mode 100644 lib/std/Thread/StaticResetEvent.zig (limited to 'src/ThreadPool.zig') diff --git a/CMakeLists.txt b/CMakeLists.txt index 9c4982ea83..463718b31c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -533,11 +533,9 @@ set(ZIG_STAGE2_SOURCES "${CMAKE_SOURCE_DIR}/lib/std/target/wasm.zig" "${CMAKE_SOURCE_DIR}/lib/std/target/x86.zig" "${CMAKE_SOURCE_DIR}/lib/std/Thread.zig" - "${CMAKE_SOURCE_DIR}/lib/std/Thread/AutoResetEvent.zig" "${CMAKE_SOURCE_DIR}/lib/std/Thread/Futex.zig" "${CMAKE_SOURCE_DIR}/lib/std/Thread/Mutex.zig" "${CMAKE_SOURCE_DIR}/lib/std/Thread/ResetEvent.zig" - "${CMAKE_SOURCE_DIR}/lib/std/Thread/StaticResetEvent.zig" "${CMAKE_SOURCE_DIR}/lib/std/time.zig" "${CMAKE_SOURCE_DIR}/lib/std/treap.zig" "${CMAKE_SOURCE_DIR}/lib/std/unicode.zig" diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index db7117fdd7..45d2ac0040 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -10,10 +10,8 @@ const assert = std.debug.assert; const target = builtin.target; const Atomic = std.atomic.Atomic; -pub const AutoResetEvent = @import("Thread/AutoResetEvent.zig"); pub const Futex = @import("Thread/Futex.zig"); pub const ResetEvent = @import("Thread/ResetEvent.zig"); -pub const StaticResetEvent = @import("Thread/StaticResetEvent.zig"); pub const Mutex = @import("Thread/Mutex.zig"); pub const Semaphore = @import("Thread/Semaphore.zig"); pub const Condition = @import("Thread/Condition.zig"); @@ -1078,17 +1076,13 @@ test "setName, getName" { if (builtin.single_threaded) return error.SkipZigTest; const Context = struct { - start_wait_event: ResetEvent = undefined, - test_done_event: ResetEvent = undefined, + start_wait_event: ResetEvent = .{}, + test_done_event: ResetEvent = .{}, + thread_done_event: ResetEvent = .{}, done: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), thread: Thread = undefined, - fn init(self: *@This()) !void { - try self.start_wait_event.init(); - try self.test_done_event.init(); - } - pub fn run(ctx: *@This()) !void { // Wait for the main thread to have set the thread field in the context. ctx.start_wait_event.wait(); @@ -1104,16 +1098,14 @@ test "setName, getName" { // Signal our test is done ctx.test_done_event.set(); - while (!ctx.done.load(.SeqCst)) { - std.time.sleep(5 * std.time.ns_per_ms); - } + // wait for the thread to property exit + ctx.thread_done_event.wait(); } }; var context = Context{}; - try context.init(); - var thread = try spawn(.{}, Context.run, .{&context}); + context.thread = thread; context.start_wait_event.set(); context.test_done_event.wait(); @@ -1139,16 +1131,14 @@ test "setName, getName" { }, } - context.done.store(true, .SeqCst); + context.thread_done_event.set(); thread.join(); } test "std.Thread" { // Doesn't use testing.refAllDecls() since that would pull in the compileError spinLoopHint. - _ = AutoResetEvent; _ = Futex; _ = ResetEvent; - _ = StaticResetEvent; _ = Mutex; _ = Semaphore; _ = Condition; @@ -1163,9 +1153,7 @@ test "Thread.join" { if (builtin.single_threaded) return error.SkipZigTest; var value: usize = 0; - var event: ResetEvent = undefined; - try event.init(); - defer event.deinit(); + var event = ResetEvent{}; const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event }); thread.join(); @@ -1177,9 +1165,7 @@ test "Thread.detach" { if (builtin.single_threaded) return error.SkipZigTest; var value: usize = 0; - var event: ResetEvent = undefined; - try event.init(); - defer event.deinit(); + var event = ResetEvent{}; const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event }); thread.detach(); diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig deleted file mode 100644 index f22b1e3db0..0000000000 --- a/lib/std/Thread/AutoResetEvent.zig +++ /dev/null @@ -1,222 +0,0 @@ -//! Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`. -//! Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like). -//! -//! AutoResetEvent has 3 possible states: -//! - UNSET: the AutoResetEvent is currently unset -//! - SET: the AutoResetEvent was notified before a wait() was called -//! - : there is an active waiter waiting for a notification. -//! -//! When attempting to wait: -//! if the event is unset, it registers a ResetEvent pointer to be notified when the event is set -//! if the event is already set, then it consumes the notification and resets the event. -//! -//! When attempting to notify: -//! if the event is unset, then we set the event -//! if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent -//! -//! This ensures that the event is automatically reset after a wait() has been issued -//! and avoids the race condition when using StaticResetEvent in the following scenario: -//! thread 1 | thread 2 -//! StaticResetEvent.wait() | -//! | StaticResetEvent.set() -//! | StaticResetEvent.set() -//! StaticResetEvent.reset() | -//! StaticResetEvent.wait() | (missed the second .set() notification above) - -state: usize = UNSET, - -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; -const assert = std.debug.assert; -const StaticResetEvent = std.Thread.StaticResetEvent; -const AutoResetEvent = @This(); - -const UNSET = 0; -const SET = 1; - -/// the minimum alignment for the `*StaticResetEvent` created by wait*() -const event_align = std.math.max(@alignOf(StaticResetEvent), 2); - -pub fn wait(self: *AutoResetEvent) void { - self.waitFor(null) catch unreachable; -} - -pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void { - return self.waitFor(timeout); -} - -fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { - // lazily initialized StaticResetEvent - var reset_event: StaticResetEvent align(event_align) = undefined; - var has_reset_event = false; - - var state = @atomicLoad(usize, &self.state, .SeqCst); - while (true) { - // consume a notification if there is any - if (state == SET) { - @atomicStore(usize, &self.state, UNSET, .SeqCst); - return; - } - - // check if theres currently a pending ResetEvent pointer already registered - if (state != UNSET) { - unreachable; // multiple waiting threads on the same AutoResetEvent - } - - // lazily initialize the ResetEvent if it hasn't been already - if (!has_reset_event) { - has_reset_event = true; - reset_event = .{}; - } - - // Since the AutoResetEvent currently isnt set, - // try to register our ResetEvent on it to wait - // for a set() call from another thread. - if (@cmpxchgWeak( - usize, - &self.state, - UNSET, - @ptrToInt(&reset_event), - .SeqCst, - .SeqCst, - )) |new_state| { - state = new_state; - continue; - } - - // if no timeout was specified, then just wait forever - const timeout_ns = timeout orelse { - reset_event.wait(); - return; - }; - - // wait with a timeout and return if signalled via set() - switch (reset_event.timedWait(timeout_ns)) { - .event_set => return, - .timed_out => {}, - } - - // If we timed out, we need to transition the AutoResetEvent back to UNSET. - // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. - state = @cmpxchgStrong( - usize, - &self.state, - @ptrToInt(&reset_event), - UNSET, - .SeqCst, - .SeqCst, - ) orelse return error.TimedOut; - - // We didn't manage to unregister ourselves from the state. - if (state == SET) { - unreachable; // AutoResetEvent notified without waking up the waiting thread - } else if (state != UNSET) { - unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out - } - - // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up. - // We need to wait for it to wake up our ResetEvent before we can return and invalidate it. - // We don't return error.TimedOut here as it technically notified us while we were "timing out". - reset_event.wait(); - return; - } -} - -pub fn set(self: *AutoResetEvent) void { - var state = @atomicLoad(usize, &self.state, .SeqCst); - while (true) { - // If the AutoResetEvent is already set, there is nothing else left to do - if (state == SET) { - return; - } - - // If the AutoResetEvent isn't set, - // then try to leave a notification for the wait() thread that we set() it. - if (state == UNSET) { - state = @cmpxchgWeak( - usize, - &self.state, - UNSET, - SET, - .SeqCst, - .SeqCst, - ) orelse return; - continue; - } - - // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting. - // Try to acquire ownership of it so that we can wake it up. - // This also resets the AutoResetEvent so that there is no race condition as defined above. - if (@cmpxchgWeak( - usize, - &self.state, - state, - UNSET, - .SeqCst, - .SeqCst, - )) |new_state| { - state = new_state; - continue; - } - - const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state); - reset_event.set(); - return; - } -} - -test "basic usage" { - // test local code paths - { - var event = AutoResetEvent{}; - try testing.expectError(error.TimedOut, event.timedWait(1)); - event.set(); - event.wait(); - } - - // test cross-thread signaling - if (builtin.single_threaded) - return; - - const Context = struct { - value: u128 = 0, - in: AutoResetEvent = AutoResetEvent{}, - out: AutoResetEvent = AutoResetEvent{}, - - const Self = @This(); - - fn sender(self: *Self) !void { - try testing.expect(self.value == 0); - self.value = 1; - self.out.set(); - - self.in.wait(); - try testing.expect(self.value == 2); - self.value = 3; - self.out.set(); - - self.in.wait(); - try testing.expect(self.value == 4); - } - - fn receiver(self: *Self) !void { - self.out.wait(); - try testing.expect(self.value == 1); - self.value = 2; - self.in.set(); - - self.out.wait(); - try testing.expect(self.value == 3); - self.value = 4; - self.in.set(); - } - }; - - var context = Context{}; - const send_thread = try std.Thread.spawn(.{}, Context.sender, .{&context}); - const recv_thread = try std.Thread.spawn(.{}, Context.receiver, .{&context}); - - send_thread.join(); - recv_thread.join(); -} diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 33eb30ba9d..bbe6b813ba 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -809,7 +809,7 @@ const PosixImpl = struct { // // The pending count increment in wait() must also now use SeqCst for the update + this pending load // to be in the same modification order as our load isn't using Release/Acquire to guarantee it. - std.atomic.fence(.SeqCst); + bucket.pending.fence(.SeqCst); if (bucket.pending.load(.Monotonic) == 0) { return; } diff --git a/lib/std/Thread/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig index 7fc4f3c2b4..87232c29cf 100644 --- a/lib/std/Thread/ResetEvent.zig +++ b/lib/std/Thread/ResetEvent.zig @@ -1,291 +1,281 @@ -//! A thread-safe resource which supports blocking until signaled. -//! This API is for kernel threads, not evented I/O. -//! This API requires being initialized at runtime, and initialization -//! can fail. Once initialized, the core operations cannot fail. -//! If you need an abstraction that cannot fail to be initialized, see -//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure, -//! it is preferred to use `ResetEvent`. +//! ResetEvent is a thread-safe bool which can be set to true/false ("set"/"unset"). +//! It can also block threads until the "bool" is set with cancellation via timed waits. +//! ResetEvent can be statically initialized and is at most `@sizeOf(u64)` large. -const ResetEvent = @This(); const std = @import("../std.zig"); const builtin = @import("builtin"); -const testing = std.testing; -const assert = std.debug.assert; -const c = std.c; -const os = std.os; -const time = std.time; - -impl: Impl, +const ResetEvent = @This(); -pub const Impl = if (builtin.single_threaded) - std.Thread.StaticResetEvent.DebugEvent -else if (builtin.target.isDarwin()) - DarwinEvent -else if (std.Thread.use_pthreads) - PosixEvent -else - std.Thread.StaticResetEvent.AtomicEvent; +const os = std.os; +const assert = std.debug.assert; +const testing = std.testing; +const Atomic = std.atomic.Atomic; +const Futex = std.Thread.Futex; -pub const InitError = error{SystemResources}; +impl: Impl = .{}, -/// After `init`, it is legal to call any other function. -pub fn init(ev: *ResetEvent) InitError!void { - return ev.impl.init(); +/// Returns if the ResetEvent was set(). +/// Once reset() is called, this returns false until the next set(). +/// The memory accesses before the set() can be said to happen before isSet() returns true. +pub fn isSet(self: *const ResetEvent) bool { + return self.impl.isSet(); } -/// This function is not thread-safe. -/// After `deinit`, the only legal function to call is `init`. -pub fn deinit(ev: *ResetEvent) void { - return ev.impl.deinit(); +/// Block's the callers thread until the ResetEvent is set(). +/// This is effectively a more efficient version of `while (!isSet()) {}`. +/// The memory accesses before the set() can be said to happen before wait() returns. +pub fn wait(self: *ResetEvent) void { + self.impl.wait(null) catch |err| switch (err) { + error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out + }; } -/// Sets the event if not already set and wakes up all the threads waiting on -/// the event. It is safe to call `set` multiple times before calling `wait`. -/// However it is illegal to call `set` after `wait` is called until the event -/// is `reset`. This function is thread-safe. -pub fn set(ev: *ResetEvent) void { - return ev.impl.set(); +/// Block's the callers thread until the ResetEvent is set(), or until the corresponding timeout expires. +/// If the timeout expires before the ResetEvent is set, `error.Timeout` is returned. +/// This is effectively a more efficient version of `while (!isSet()) {}`. +/// The memory accesses before the set() can be said to happen before timedWait() returns without error. +pub fn timedWait(self: *ResetEvent, timeout_ns: u64) error{Timeout}!void { + return self.impl.wait(timeout_ns); } -/// Resets the event to its original, unset state. -/// This function is *not* thread-safe. It is equivalent to calling -/// `deinit` followed by `init` but without the possibility of failure. -pub fn reset(ev: *ResetEvent) void { - return ev.impl.reset(); +/// Marks the ResetEvent as "set" and unblocks any threads in `wait()` or `timedWait()` to observe the new state. +/// The ResetEvent says "set" until reset() is called, making future set() calls do nothing semantically. +/// The memory accesses before set() can be said to happen before isSet() returns true or wait()/timedWait() return successfully. +pub fn set(self: *ResetEvent) void { + self.impl.set(); } -/// Wait for the event to be set by blocking the current thread. -/// Thread-safe. No spurious wakeups. -/// Upon return from `wait`, the only functions available to be called -/// in `ResetEvent` are `reset` and `deinit`. -pub fn wait(ev: *ResetEvent) void { - return ev.impl.wait(); +/// Unmarks the ResetEvent from its "set" state if set() was called previously. +/// It is undefined behavior is reset() is called while threads are blocked in wait() or timedWait(). +/// Concurrent calls to set(), isSet() and reset() are allowed. +pub fn reset(self: *ResetEvent) void { + self.impl.reset(); } -pub const TimedWaitResult = enum { event_set, timed_out }; - -/// Wait for the event to be set by blocking the current thread. -/// A timeout in nanoseconds can be provided as a hint for how -/// long the thread should block on the unset event before returning -/// `TimedWaitResult.timed_out`. -/// Thread-safe. No precision of timing is guaranteed. -/// Upon return from `wait`, the only functions available to be called -/// in `ResetEvent` are `reset` and `deinit`. -pub fn timedWait(ev: *ResetEvent, timeout_ns: u64) TimedWaitResult { - return ev.impl.timedWait(timeout_ns); -} +const Impl = if (builtin.single_threaded) + SingleThreadedImpl +else + FutexImpl; -/// Apple has decided to not support POSIX semaphores, so we go with a -/// different approach using Grand Central Dispatch. This API is exposed -/// by libSystem so it is guaranteed to be available on all Darwin platforms. -pub const DarwinEvent = struct { - sem: c.dispatch_semaphore_t = undefined, +const SingleThreadedImpl = struct { + is_set: bool = false, - pub fn init(ev: *DarwinEvent) !void { - ev.* = .{ - .sem = c.dispatch_semaphore_create(0) orelse return error.SystemResources, - }; + fn isSet(self: *const Impl) bool { + return self.is_set; } - pub fn deinit(ev: *DarwinEvent) void { - c.dispatch_release(ev.sem); - ev.* = undefined; - } + fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void { + if (self.isSet()) { + return; + } - pub fn set(ev: *DarwinEvent) void { - // Empirically this returns the numerical value of the semaphore. - _ = c.dispatch_semaphore_signal(ev.sem); - } + // There are no other threads to wake us up. + // So if we wait without a timeout we would never wake up. + const timeout_ns = timeout orelse { + unreachable; // deadlock detected + }; - pub fn wait(ev: *DarwinEvent) void { - assert(c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_FOREVER) == 0); + std.time.sleep(timeout_ns); + return error.Timeout; } - pub fn timedWait(ev: *DarwinEvent, timeout_ns: u64) TimedWaitResult { - const t = c.dispatch_time(c.DISPATCH_TIME_NOW, @intCast(i64, timeout_ns)); - if (c.dispatch_semaphore_wait(ev.sem, t) != 0) { - return .timed_out; - } else { - return .event_set; - } + fn set(self: *Impl) void { + self.is_set = true; } - pub fn reset(ev: *DarwinEvent) void { - // Keep calling until the semaphore goes back down to 0. - while (c.dispatch_semaphore_wait(ev.sem, c.DISPATCH_TIME_NOW) == 0) {} + fn reset(self: *Impl) void { + self.is_set = false; } }; -/// POSIX semaphores must be initialized at runtime because they are allowed to -/// be implemented as file descriptors, in which case initialization would require -/// a syscall to open the fd. -pub const PosixEvent = struct { - sem: c.sem_t = undefined, +const FutexImpl = struct { + state: Atomic(u32) = Atomic(u32).init(unset), - pub fn init(ev: *PosixEvent) !void { - switch (c.getErrno(c.sem_init(&ev.sem, 0, 0))) { - .SUCCESS => return, - else => return error.SystemResources, - } - } + const unset = 0; + const waiting = 1; + const is_set = 2; - pub fn deinit(ev: *PosixEvent) void { - assert(c.sem_destroy(&ev.sem) == 0); - ev.* = undefined; + fn isSet(self: *const Impl) bool { + // Acquire barrier ensures memory accesses before set() happen before we return true. + return self.state.load(.Acquire) == is_set; } - pub fn set(ev: *PosixEvent) void { - assert(c.sem_post(&ev.sem) == 0); + fn wait(self: *Impl, timeout: ?u64) error{Timeout}!void { + // Outline the slow path to allow isSet() to be inlined + if (!self.isSet()) { + return self.waitUntilSet(timeout); + } } - pub fn wait(ev: *PosixEvent) void { - while (true) { - switch (c.getErrno(c.sem_wait(&ev.sem))) { - .SUCCESS => return, - .INTR => continue, - .INVAL => unreachable, - else => unreachable, - } + fn waitUntilSet(self: *Impl, timeout: ?u64) error{Timeout}!void { + @setCold(true); + + // Try to set the state from `unset` to `waiting` to indicate + // to the set() thread that others are blocked on the ResetEvent. + // We avoid using any strict barriers until the end when we know the ResetEvent is set. + var state = self.state.load(.Monotonic); + if (state == unset) { + state = self.state.compareAndSwap(state, waiting, .Monotonic, .Monotonic) orelse waiting; } - } - pub fn timedWait(ev: *PosixEvent, timeout_ns: u64) TimedWaitResult { - var ts: os.timespec = undefined; - var timeout_abs = timeout_ns; - os.clock_gettime(os.CLOCK.REALTIME, &ts) catch return .timed_out; - timeout_abs += @intCast(u64, ts.tv_sec) * time.ns_per_s; - timeout_abs += @intCast(u64, ts.tv_nsec); - ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout_abs, time.ns_per_s)); - ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout_abs, time.ns_per_s)); - while (true) { - switch (c.getErrno(c.sem_timedwait(&ev.sem, &ts))) { - .SUCCESS => return .event_set, - .INTR => continue, - .INVAL => unreachable, - .TIMEDOUT => return .timed_out, - else => unreachable, + // Wait until the ResetEvent is set since the state is waiting. + if (state == waiting) { + var futex_deadline = Futex.Deadline.init(timeout); + while (true) { + const wait_result = futex_deadline.wait(&self.state, waiting); + + // Check if the ResetEvent was set before possibly reporting error.Timeout below. + state = self.state.load(.Monotonic); + if (state != waiting) { + break; + } + + try wait_result; } } + + // Acquire barrier ensures memory accesses before set() happen before we return. + assert(state == is_set); + self.state.fence(.Acquire); } - pub fn reset(ev: *PosixEvent) void { - while (true) { - switch (c.getErrno(c.sem_trywait(&ev.sem))) { - .SUCCESS => continue, // Need to make it go to zero. - .INTR => continue, - .INVAL => unreachable, - .AGAIN => return, // The semaphore currently has the value zero. - else => unreachable, - } + fn set(self: *Impl) void { + // Quick check if the ResetEvent is already set before doing the atomic swap below. + // set() could be getting called quite often and multiple threads calling swap() increases contention unnecessarily. + if (self.state.load(.Monotonic) == is_set) { + return; + } + + // Mark the ResetEvent as set and unblock all waiters waiting on it if any. + // Release barrier ensures memory accesses before set() happen before the ResetEvent is observed to be "set". + if (self.state.swap(is_set, .Release) == waiting) { + Futex.wake(&self.state, std.math.maxInt(u32)); } } + + fn reset(self: *Impl) void { + self.state.store(unset, .Monotonic); + } }; -test "basic usage" { - var event: ResetEvent = undefined; - try event.init(); - defer event.deinit(); +test "ResetEvent - smoke test" { + // make sure the event is unset + var event = ResetEvent{}; + try testing.expectEqual(false, event.isSet()); - // test event setting + // make sure the event gets set event.set(); + try testing.expectEqual(true, event.isSet()); - // test event resetting + // make sure the event gets unset again event.reset(); + try testing.expectEqual(false, event.isSet()); - // test event waiting (non-blocking) - event.set(); - event.wait(); - event.reset(); + // waits should timeout as there's no other thread to set the event + try testing.expectError(error.Timeout, event.timedWait(0)); + try testing.expectError(error.Timeout, event.timedWait(std.time.ns_per_ms)); + // set the event again and make sure waits complete event.set(); - try testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); + event.wait(); + try event.timedWait(std.time.ns_per_ms); + try testing.expectEqual(true, event.isSet()); +} - // test cross-thread signaling - if (builtin.single_threaded) - return; +test "ResetEvent - signaling" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } const Context = struct { - const Self = @This(); - - value: u128, - in: ResetEvent, - out: ResetEvent, - - fn init(self: *Self) !void { - self.* = .{ - .value = 0, - .in = undefined, - .out = undefined, - }; - try self.in.init(); - try self.out.init(); - } + in: ResetEvent = .{}, + out: ResetEvent = .{}, + value: usize = 0, + + fn input(self: *@This()) !void { + // wait for the value to become 1 + self.in.wait(); + self.in.reset(); + try testing.expectEqual(self.value, 1); + + // bump the value and wake up output() + self.value = 2; + self.out.set(); - fn deinit(self: *Self) void { - self.in.deinit(); - self.out.deinit(); - self.* = undefined; + // wait for output to receive 2, bump the value and wake us up with 3 + self.in.wait(); + self.in.reset(); + try testing.expectEqual(self.value, 3); + + // bump the value and wake up output() for it to see 4 + self.value = 4; + self.out.set(); } - fn sender(self: *Self) !void { - // update value and signal input - try testing.expect(self.value == 0); + fn output(self: *@This()) !void { + // start with 0 and bump the value for input to see 1 + try testing.expectEqual(self.value, 0); self.value = 1; self.in.set(); - // wait for receiver to update value and signal output + // wait for input to receive 1, bump the value to 2 and wake us up self.out.wait(); - try testing.expect(self.value == 2); + self.out.reset(); + try testing.expectEqual(self.value, 2); - // update value and signal final input + // bump the value to 3 for input to see (rhymes) self.value = 3; self.in.set(); + + // wait for input to bump the value to 4 and receive no more (rhymes) + self.out.wait(); + self.out.reset(); + try testing.expectEqual(self.value, 4); } + }; - fn receiver(self: *Self) !void { - // wait for sender to update value and signal input - self.in.wait(); - try testing.expect(self.value == 1); + var ctx = Context{}; - // update value and signal output - self.in.reset(); - self.value = 2; - self.out.set(); + const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx}); + defer thread.join(); - // wait for sender to update value and signal final input - self.in.wait(); - try testing.expect(self.value == 3); - } + try ctx.input(); +} - fn sleeper(self: *Self) void { - self.in.set(); - time.sleep(time.ns_per_ms * 2); - self.value = 5; - self.out.set(); +test "ResetEvent - broadcast" { + // This test requires spawning threads + if (builtin.single_threaded) { + return error.SkipZigTest; + } + + const num_threads = 10; + const Barrier = struct { + event: ResetEvent = .{}, + counter: Atomic(usize) = Atomic(usize).init(num_threads), + + fn wait(self: *@This()) void { + if (self.counter.fetchSub(1, .AcqRel) == 1) { + self.event.set(); + } } + }; - fn timedWaiter(self: *Self) !void { - self.in.wait(); - try testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); - try self.out.timedWait(time.ns_per_ms * 100); - try testing.expect(self.value == 5); + const Context = struct { + start_barrier: Barrier = .{}, + finish_barrier: Barrier = .{}, + + fn run(self: *@This()) void { + self.start_barrier.wait(); + self.finish_barrier.wait(); } }; - var context: Context = undefined; - try context.init(); - defer context.deinit(); - const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); - defer receiver.join(); - try context.sender(); - - if (false) { - // I have now observed this fail on macOS, Windows, and Linux. - // https://github.com/ziglang/zig/issues/7009 - var timed = Context.init(); - defer timed.deinit(); - const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); - defer sleeper.join(); - try timed.timedWaiter(); - } + var ctx = Context{}; + var threads: [num_threads - 1]std.Thread = undefined; + + for (threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx}); + defer for (threads) |t| t.join(); + + ctx.run(); } diff --git a/lib/std/Thread/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig deleted file mode 100644 index fcd13f6ac7..0000000000 --- a/lib/std/Thread/StaticResetEvent.zig +++ /dev/null @@ -1,395 +0,0 @@ -//! A thread-safe resource which supports blocking until signaled. -//! This API is for kernel threads, not evented I/O. -//! This API is statically initializable. It cannot fail to be initialized -//! and it requires no deinitialization. The downside is that it may not -//! integrate as cleanly into other synchronization APIs, or, in a worst case, -//! may be forced to fall back on spin locking. As a rule of thumb, prefer -//! to use `std.Thread.ResetEvent` when possible, and use `StaticResetEvent` when -//! the logic needs stronger API guarantees. - -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const StaticResetEvent = @This(); -const assert = std.debug.assert; -const os = std.os; -const time = std.time; -const linux = std.os.linux; -const windows = std.os.windows; -const testing = std.testing; - -impl: Impl = .{}, - -pub const Impl = if (builtin.single_threaded) - DebugEvent -else - AtomicEvent; - -/// Sets the event if not already set and wakes up all the threads waiting on -/// the event. It is safe to call `set` multiple times before calling `wait`. -/// However it is illegal to call `set` after `wait` is called until the event -/// is `reset`. This function is thread-safe. -pub fn set(ev: *StaticResetEvent) void { - return ev.impl.set(); -} - -/// Wait for the event to be set by blocking the current thread. -/// Thread-safe. No spurious wakeups. -/// Upon return from `wait`, the only function available to be called -/// in `StaticResetEvent` is `reset`. -pub fn wait(ev: *StaticResetEvent) void { - return ev.impl.wait(); -} - -/// Resets the event to its original, unset state. -/// This function is *not* thread-safe. It is equivalent to calling -/// `deinit` followed by `init` but without the possibility of failure. -pub fn reset(ev: *StaticResetEvent) void { - return ev.impl.reset(); -} - -pub const TimedWaitResult = std.Thread.ResetEvent.TimedWaitResult; - -/// Wait for the event to be set by blocking the current thread. -/// A timeout in nanoseconds can be provided as a hint for how -/// long the thread should block on the unset event before returning -/// `TimedWaitResult.timed_out`. -/// Thread-safe. No precision of timing is guaranteed. -/// Upon return from `timedWait`, the only function available to be called -/// in `StaticResetEvent` is `reset`. -pub fn timedWait(ev: *StaticResetEvent, timeout_ns: u64) TimedWaitResult { - return ev.impl.timedWait(timeout_ns); -} - -/// For single-threaded builds, we use this to detect deadlocks. -/// In unsafe modes this ends up being no-ops. -pub const DebugEvent = struct { - state: State = State.unset, - - const State = enum { - unset, - set, - waited, - }; - - /// This function is provided so that this type can be re-used inside - /// `std.Thread.ResetEvent`. - pub fn init(ev: *DebugEvent) void { - ev.* = .{}; - } - - /// This function is provided so that this type can be re-used inside - /// `std.Thread.ResetEvent`. - pub fn deinit(ev: *DebugEvent) void { - ev.* = undefined; - } - - pub fn set(ev: *DebugEvent) void { - switch (ev.state) { - .unset => ev.state = .set, - .set => {}, - .waited => unreachable, // Not allowed to call `set` until `reset`. - } - } - - pub fn wait(ev: *DebugEvent) void { - switch (ev.state) { - .unset => unreachable, // Deadlock detected. - .set => return, - .waited => unreachable, // Not allowed to call `wait` until `reset`. - } - } - - pub fn timedWait(ev: *DebugEvent, timeout: u64) TimedWaitResult { - _ = timeout; - switch (ev.state) { - .unset => return .timed_out, - .set => return .event_set, - .waited => unreachable, // Not allowed to call `wait` until `reset`. - } - } - - pub fn reset(ev: *DebugEvent) void { - ev.state = .unset; - } -}; - -pub const AtomicEvent = struct { - waiters: u32 = 0, - - const WAKE = 1 << 0; - const WAIT = 1 << 1; - - /// This function is provided so that this type can be re-used inside - /// `std.Thread.ResetEvent`. - pub fn init(ev: *AtomicEvent) void { - ev.* = .{}; - } - - /// This function is provided so that this type can be re-used inside - /// `std.Thread.ResetEvent`. - pub fn deinit(ev: *AtomicEvent) void { - ev.* = undefined; - } - - pub fn set(ev: *AtomicEvent) void { - const waiters = @atomicRmw(u32, &ev.waiters, .Xchg, WAKE, .Release); - if (waiters >= WAIT) { - return Futex.wake(&ev.waiters, waiters >> 1); - } - } - - pub fn wait(ev: *AtomicEvent) void { - switch (ev.timedWait(null)) { - .timed_out => unreachable, - .event_set => return, - } - } - - pub fn timedWait(ev: *AtomicEvent, timeout: ?u64) TimedWaitResult { - var waiters = @atomicLoad(u32, &ev.waiters, .Acquire); - while (waiters != WAKE) { - waiters = @cmpxchgWeak(u32, &ev.waiters, waiters, waiters + WAIT, .Acquire, .Acquire) orelse { - if (Futex.wait(&ev.waiters, timeout)) |_| { - return .event_set; - } else |_| { - return .timed_out; - } - }; - } - return .event_set; - } - - pub fn reset(ev: *AtomicEvent) void { - @atomicStore(u32, &ev.waiters, 0, .Monotonic); - } - - pub const Futex = switch (builtin.os.tag) { - .windows => WindowsFutex, - .linux => LinuxFutex, - else => SpinFutex, - }; - - pub const SpinFutex = struct { - fn wake(waiters: *u32, wake_count: u32) void { - _ = waiters; - _ = wake_count; - } - - fn wait(waiters: *u32, timeout: ?u64) !void { - var timer: time.Timer = undefined; - if (timeout != null) - timer = time.Timer.start() catch return error.TimedOut; - - while (@atomicLoad(u32, waiters, .Acquire) != WAKE) { - std.Thread.yield() catch std.atomic.spinLoopHint(); - if (timeout) |timeout_ns| { - if (timer.read() >= timeout_ns) - return error.TimedOut; - } - } - } - }; - - pub const LinuxFutex = struct { - fn wake(waiters: *u32, wake_count: u32) void { - _ = wake_count; - const waiting = std.math.maxInt(i32); // wake_count - const ptr = @ptrCast(*const i32, waiters); - const rc = linux.futex_wake(ptr, linux.FUTEX.WAKE | linux.FUTEX.PRIVATE_FLAG, waiting); - assert(linux.getErrno(rc) == .SUCCESS); - } - - fn wait(waiters: *u32, timeout: ?u64) !void { - var ts: linux.timespec = undefined; - var ts_ptr: ?*linux.timespec = null; - if (timeout) |timeout_ns| { - ts_ptr = &ts; - ts.tv_sec = @intCast(isize, timeout_ns / time.ns_per_s); - ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s); - } - - while (true) { - const waiting = @atomicLoad(u32, waiters, .Acquire); - if (waiting == WAKE) - return; - const expected = @intCast(i32, waiting); - const ptr = @ptrCast(*const i32, waiters); - const rc = linux.futex_wait(ptr, linux.FUTEX.WAIT | linux.FUTEX.PRIVATE_FLAG, expected, ts_ptr); - switch (linux.getErrno(rc)) { - .SUCCESS => continue, - .TIMEDOUT => return error.TimedOut, - .INTR => continue, - .AGAIN => return, - else => unreachable, - } - } - } - }; - - pub const WindowsFutex = struct { - pub fn wake(waiters: *u32, wake_count: u32) void { - const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count); - const key = @ptrCast(*const anyopaque, waiters); - - var waiting = wake_count; - while (waiting != 0) : (waiting -= 1) { - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .SUCCESS); - } - } - - pub fn wait(waiters: *u32, timeout: ?u64) !void { - const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout); - const key = @ptrCast(*const anyopaque, waiters); - - // NT uses timeouts in units of 100ns with negative value being relative - var timeout_ptr: ?*windows.LARGE_INTEGER = null; - var timeout_value: windows.LARGE_INTEGER = undefined; - if (timeout) |timeout_ns| { - timeout_ptr = &timeout_value; - timeout_value = -@intCast(windows.LARGE_INTEGER, timeout_ns / 100); - } - - // NtWaitForKeyedEvent doesnt have spurious wake-ups - var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr); - switch (rc) { - .TIMEOUT => { - // update the wait count to signal that we're not waiting anymore. - // if the .set() thread already observed that we are, perform a - // matching NtWaitForKeyedEvent so that the .set() thread doesn't - // deadlock trying to run NtReleaseKeyedEvent above. - var waiting = @atomicLoad(u32, waiters, .Monotonic); - while (true) { - if (waiting == WAKE) { - rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == windows.NTSTATUS.WAIT_0); - break; - } else { - waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break; - continue; - } - } - return error.TimedOut; - }, - windows.NTSTATUS.WAIT_0 => {}, - else => unreachable, - } - } - - var event_handle: usize = EMPTY; - const EMPTY = ~@as(usize, 0); - const LOADING = EMPTY - 1; - - pub fn getEventHandle() ?windows.HANDLE { - var handle = @atomicLoad(usize, &event_handle, .Monotonic); - while (true) { - switch (handle) { - EMPTY => handle = @cmpxchgWeak(usize, &event_handle, EMPTY, LOADING, .Acquire, .Monotonic) orelse { - const handle_ptr = @ptrCast(*windows.HANDLE, &handle); - const access_mask = windows.GENERIC_READ | windows.GENERIC_WRITE; - if (windows.ntdll.NtCreateKeyedEvent(handle_ptr, access_mask, null, 0) != .SUCCESS) - handle = 0; - @atomicStore(usize, &event_handle, handle, .Monotonic); - return @intToPtr(?windows.HANDLE, handle); - }, - LOADING => { - std.Thread.yield() catch std.atomic.spinLoopHint(); - handle = @atomicLoad(usize, &event_handle, .Monotonic); - }, - else => { - return @intToPtr(?windows.HANDLE, handle); - }, - } - } - } - }; -}; - -test "basic usage" { - var event = StaticResetEvent{}; - - // test event setting - event.set(); - - // test event resetting - event.reset(); - - // test event waiting (non-blocking) - event.set(); - event.wait(); - event.reset(); - - event.set(); - try testing.expectEqual(TimedWaitResult.event_set, event.timedWait(1)); - - // test cross-thread signaling - if (builtin.single_threaded) - return; - - const Context = struct { - const Self = @This(); - - value: u128 = 0, - in: StaticResetEvent = .{}, - out: StaticResetEvent = .{}, - - fn sender(self: *Self) !void { - // update value and signal input - try testing.expect(self.value == 0); - self.value = 1; - self.in.set(); - - // wait for receiver to update value and signal output - self.out.wait(); - try testing.expect(self.value == 2); - - // update value and signal final input - self.value = 3; - self.in.set(); - } - - fn receiver(self: *Self) !void { - // wait for sender to update value and signal input - self.in.wait(); - try testing.expect(self.value == 1); - - // update value and signal output - self.in.reset(); - self.value = 2; - self.out.set(); - - // wait for sender to update value and signal final input - self.in.wait(); - try testing.expect(self.value == 3); - } - - fn sleeper(self: *Self) void { - self.in.set(); - time.sleep(time.ns_per_ms * 2); - self.value = 5; - self.out.set(); - } - - fn timedWaiter(self: *Self) !void { - self.in.wait(); - try testing.expectEqual(TimedWaitResult.timed_out, self.out.timedWait(time.ns_per_us)); - try self.out.timedWait(time.ns_per_ms * 100); - try testing.expect(self.value == 5); - } - }; - - var context = Context{}; - const receiver = try std.Thread.spawn(.{}, Context.receiver, .{&context}); - defer receiver.join(); - try context.sender(); - - if (false) { - // I have now observed this fail on macOS, Windows, and Linux. - // https://github.com/ziglang/zig/issues/7009 - var timed = Context.init(); - defer timed.deinit(); - const sleeper = try std.Thread.spawn(.{}, Context.sleeper, .{&timed}); - defer sleeper.join(); - try timed.timedWaiter(); - } -} diff --git a/lib/std/atomic/Atomic.zig b/lib/std/atomic/Atomic.zig index c396281e91..7e62a9928c 100644 --- a/lib/std/atomic/Atomic.zig +++ b/lib/std/atomic/Atomic.zig @@ -14,19 +14,66 @@ pub fn Atomic(comptime T: type) type { return .{ .value = value }; } + /// Perform an atomic fence which uses the atomic value as a hint for the modification order. + /// Use this when you want to imply a fence on an atomic variable without necessarily performing a memory access. + /// + /// Example: + /// ``` + /// const RefCount = struct { + /// count: Atomic(usize), + /// dropFn: *const fn(*RefCount) void, + /// + /// fn ref(self: *RefCount) void { + /// _ = self.count.fetchAdd(1, .Monotonic); // no ordering necessary, just updating a counter + /// } + /// + /// fn unref(self: *RefCount) void { + /// // Release ensures code before unref() happens-before the count is decremented as dropFn could be called by then. + /// if (self.count.fetchSub(1, .Release)) { + /// // Acquire ensures count decrement and code before previous unrefs()s happens-before we call dropFn below. + /// // NOTE: another alterative is to use .AcqRel on the fetchSub count decrement but it's extra barrier in possibly hot path. + /// self.count.fence(.Acquire); + /// (self.dropFn)(self); + /// } + /// } + /// }; + /// ``` + pub inline fn fence(self: *Self, comptime ordering: Ordering) void { + // LLVM's ThreadSanitizer doesn't support the normal fences so we specialize for it. + if (builtin.sanitize_thread) { + const tsan = struct { + extern "c" fn __tsan_acquire(addr: *anyopaque) void; + extern "c" fn __tsan_release(addr: *anyopaque) void; + }; + + const addr = @ptrCast(*anyopaque, self); + return switch (ordering) { + .Unordered, .Monotonic => @compileError(@tagName(ordering) ++ " only applies to atomic loads and stores"), + .Acquire => tsan.__tsan_acquire(addr), + .Release => tsan.__tsan_release(addr), + .AcqRel, .SeqCst => { + tsan.__tsan_acquire(addr); + tsan.__tsan_release(addr); + }, + }; + } + + return std.atomic.fence(ordering); + } + /// Non-atomically load from the atomic value without synchronization. /// Care must be taken to avoid data-races when interacting with other atomic operations. - pub fn loadUnchecked(self: Self) T { + pub inline fn loadUnchecked(self: Self) T { return self.value; } /// Non-atomically store to the atomic value without synchronization. /// Care must be taken to avoid data-races when interacting with other atomic operations. - pub fn storeUnchecked(self: *Self, value: T) void { + pub inline fn storeUnchecked(self: *Self, value: T) void { self.value = value; } - pub fn load(self: *const Self, comptime ordering: Ordering) T { + pub inline fn load(self: *const Self, comptime ordering: Ordering) T { return switch (ordering) { .AcqRel => @compileError(@tagName(ordering) ++ " implies " ++ @tagName(Ordering.Release) ++ " which is only allowed on atomic stores"), .Release => @compileError(@tagName(ordering) ++ " is only allowed on atomic stores"), @@ -34,7 +81,7 @@ pub fn Atomic(comptime T: type) type { }; } - pub fn store(self: *Self, value: T, comptime ordering: Ordering) void { + pub inline fn store(self: *Self, value: T, comptime ordering: Ordering) void { return switch (ordering) { .AcqRel => @compileError(@tagName(ordering) ++ " implies " ++ @tagName(Ordering.Acquire) ++ " which is only allowed on atomic loads"), .Acquire => @compileError(@tagName(ordering) ++ " is only allowed on atomic loads"), @@ -189,21 +236,21 @@ pub fn Atomic(comptime T: type) type { .Set => asm volatile ("lock btsw %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), .Reset => asm volatile ("lock btrw %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), .Toggle => asm volatile ("lock btcw %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), @@ -212,21 +259,21 @@ pub fn Atomic(comptime T: type) type { .Set => asm volatile ("lock btsl %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), .Reset => asm volatile ("lock btrl %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), .Toggle => asm volatile ("lock btcl %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), @@ -235,21 +282,21 @@ pub fn Atomic(comptime T: type) type { .Set => asm volatile ("lock btsq %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), .Reset => asm volatile ("lock btrq %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), .Toggle => asm volatile ("lock btcq %[bit], %[ptr]" // LLVM doesn't support u1 flag register return values : [result] "={@ccc}" (-> u8), - : [ptr] "*p" (&self.value), + : [ptr] "*m" (&self.value), [bit] "X" (@as(T, bit)), : "cc", "memory" ), @@ -266,6 +313,13 @@ pub fn Atomic(comptime T: type) type { }; } +test "Atomic.fence" { + inline for (.{ .Acquire, .Release, .AcqRel, .SeqCst }) |ordering| { + var x = Atomic(usize).init(0); + x.fence(ordering); + } +} + fn atomicIntTypes() []const type { comptime var bytes = 1; comptime var types: []const type = &[_]type{}; diff --git a/lib/std/debug.zig b/lib/std/debug.zig index 683219c78d..347a9c433d 100644 --- a/lib/std/debug.zig +++ b/lib/std/debug.zig @@ -292,7 +292,7 @@ pub fn panicExtra( /// Non-zero whenever the program triggered a panic. /// The counter is incremented/decremented atomically. -var panicking: u8 = 0; +var panicking = std.atomic.Atomic(u8).init(0); // Locked to avoid interleaving panic messages from multiple threads. var panic_mutex = std.Thread.Mutex{}; @@ -316,7 +316,7 @@ pub fn panicImpl(trace: ?*const std.builtin.StackTrace, first_trace_addr: ?usize 0 => { panic_stage = 1; - _ = @atomicRmw(u8, &panicking, .Add, 1, .SeqCst); + _ = panicking.fetchAdd(1, .SeqCst); // Make sure to release the mutex when done { @@ -337,13 +337,13 @@ pub fn panicImpl(trace: ?*const std.builtin.StackTrace, first_trace_addr: ?usize dumpCurrentStackTrace(first_trace_addr); } - if (@atomicRmw(u8, &panicking, .Sub, 1, .SeqCst) != 1) { + if (panicking.fetchSub(1, .SeqCst) != 1) { // Another thread is panicking, wait for the last one to finish // and call abort() // Sleep forever without hammering the CPU - var event: std.Thread.StaticResetEvent = .{}; - event.wait(); + var futex = std.atomic.Atomic(u32).init(0); + while (true) std.Thread.Futex.wait(&futex, 0); unreachable; } }, diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 1eaa95d249..038ead12b5 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -8,6 +8,7 @@ const os = std.os; const windows = os.windows; const maxInt = std.math.maxInt; const Thread = std.Thread; +const Atomic = std.atomic.Atomic; const is_windows = builtin.os.tag == .windows; @@ -168,11 +169,9 @@ pub const Loop = struct { .fs_end_request = .{ .data = .{ .msg = .end, .finish = .NoAction } }, .fs_queue = std.atomic.Queue(Request).init(), .fs_thread = undefined, - .fs_thread_wakeup = undefined, + .fs_thread_wakeup = .{}, .delay_queue = undefined, }; - try self.fs_thread_wakeup.init(); - errdefer self.fs_thread_wakeup.deinit(); errdefer self.arena.deinit(); // We need at least one of these in case the fs thread wants to use onNextTick @@ -202,7 +201,6 @@ pub const Loop = struct { pub fn deinit(self: *Loop) void { self.deinitOsData(); - self.fs_thread_wakeup.deinit(); self.arena.deinit(); self.* = undefined; } @@ -723,9 +721,7 @@ pub const Loop = struct { extra_thread.join(); } - @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst); - self.delay_queue.event.set(); - self.delay_queue.thread.join(); + self.delay_queue.deinit(); } /// Runs the provided function asynchronously. The function's frame is allocated @@ -851,8 +847,8 @@ pub const Loop = struct { timer: std.time.Timer, waiters: Waiters, thread: std.Thread, - event: std.Thread.AutoResetEvent, - is_running: bool, + event: std.Thread.ResetEvent, + is_running: Atomic(bool), /// Initialize the delay queue by spawning the timer thread /// and starting any timer resources. @@ -862,11 +858,19 @@ pub const Loop = struct { .waiters = DelayQueue.Waiters{ .entries = std.atomic.Queue(anyframe).init(), }, - .event = std.Thread.AutoResetEvent{}, - .is_running = true, - // Must be last so that it can read the other state, such as `is_running`. - .thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self}), + .thread = undefined, + .event = .{}, + .is_running = Atomic(bool).init(true), }; + + // Must be after init so that it can read the other state, such as `is_running`. + self.thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self}); + } + + fn deinit(self: *DelayQueue) void { + self.is_running.store(false, .SeqCst); + self.event.set(); + self.thread.join(); } /// Entry point for the timer thread @@ -874,7 +878,8 @@ pub const Loop = struct { fn run(self: *DelayQueue) void { const loop = @fieldParentPtr(Loop, "delay_queue", self); - while (@atomicLoad(bool, &self.is_running, .SeqCst)) { + while (self.is_running.load(.SeqCst)) { + self.event.reset(); const now = self.timer.read(); if (self.waiters.popExpired(now)) |entry| { diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index b8e6c4d89a..f4003c1803 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -917,7 +917,7 @@ test "open file with exclusive and shared nonblocking lock" { try testing.expectError(error.WouldBlock, file2); } -test "open file with exclusive lock twice, make sure it waits" { +test "open file with exclusive lock twice, make sure second lock waits" { if (builtin.single_threaded) return error.SkipZigTest; if (std.io.is_async) { @@ -934,30 +934,33 @@ test "open file with exclusive lock twice, make sure it waits" { errdefer file.close(); const S = struct { - fn checkFn(dir: *fs.Dir, evt: *std.Thread.ResetEvent) !void { + fn checkFn(dir: *fs.Dir, started: *std.Thread.ResetEvent, locked: *std.Thread.ResetEvent) !void { + started.set(); const file1 = try dir.createFile(filename, .{ .lock = .Exclusive }); - defer file1.close(); - evt.set(); + + locked.set(); + file1.close(); } }; - var evt: std.Thread.ResetEvent = undefined; - try evt.init(); - defer evt.deinit(); + var started = std.Thread.ResetEvent{}; + var locked = std.Thread.ResetEvent{}; - const t = try std.Thread.spawn(.{}, S.checkFn, .{ &tmp.dir, &evt }); + const t = try std.Thread.spawn(.{}, S.checkFn, .{ + &tmp.dir, + &started, + &locked, + }); defer t.join(); - const SLEEP_TIMEOUT_NS = 10 * std.time.ns_per_ms; - // Make sure we've slept enough. - var timer = try std.time.Timer.start(); - while (true) { - std.time.sleep(SLEEP_TIMEOUT_NS); - if (timer.read() >= SLEEP_TIMEOUT_NS) break; - } + // Wait for the spawned thread to start trying to acquire the exclusive file lock. + // Then wait a bit to make sure that can't acquire it since we currently hold the file lock. + started.wait(); + try testing.expectError(error.Timeout, locked.timedWait(10 * std.time.ns_per_ms)); + + // Release the file lock which should unlock the thread to lock it and set the locked event. file.close(); - // No timeout to avoid failures on heavily loaded systems. - evt.wait(); + locked.wait(); } test "open file with exclusive nonblocking lock twice (absolute paths)" { diff --git a/src/Compilation.zig b/src/Compilation.zig index bfe52cd59e..b0c8f5f475 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -163,8 +163,8 @@ emit_llvm_bc: ?EmitLoc, emit_analysis: ?EmitLoc, emit_docs: ?EmitLoc, -work_queue_wait_group: WaitGroup, -astgen_wait_group: WaitGroup, +work_queue_wait_group: WaitGroup = .{}, +astgen_wait_group: WaitGroup = .{}, /// Exported symbol names. This is only for when the target is wasm. /// TODO: Remove this when Stage2 becomes the default compiler as it will already have this information. @@ -1674,19 +1674,11 @@ pub fn create(gpa: Allocator, options: InitOptions) !*Compilation { .test_evented_io = options.test_evented_io, .debug_compiler_runtime_libs = options.debug_compiler_runtime_libs, .debug_compile_errors = options.debug_compile_errors, - .work_queue_wait_group = undefined, - .astgen_wait_group = undefined, }; break :comp comp; }; errdefer comp.destroy(); - try comp.work_queue_wait_group.init(); - errdefer comp.work_queue_wait_group.deinit(); - - try comp.astgen_wait_group.init(); - errdefer comp.astgen_wait_group.deinit(); - // Add a `CObject` for each `c_source_files`. try comp.c_object_table.ensureTotalCapacity(gpa, options.c_source_files.len); for (options.c_source_files) |c_source_file| { @@ -1894,9 +1886,6 @@ pub fn destroy(self: *Compilation) void { self.cache_parent.manifest_dir.close(); if (self.owned_link_dir) |*dir| dir.close(); - self.work_queue_wait_group.deinit(); - self.astgen_wait_group.deinit(); - for (self.export_symbol_names.items) |symbol_name| { gpa.free(symbol_name); } @@ -4701,6 +4690,7 @@ pub fn generateBuiltinZigSource(comp: *Compilation, allocator: Allocator) Alloca \\pub const link_libcpp = {}; \\pub const have_error_return_tracing = {}; \\pub const valgrind_support = {}; + \\pub const sanitize_thread = {}; \\pub const position_independent_code = {}; \\pub const position_independent_executable = {}; \\pub const strip_debug_info = {}; @@ -4713,6 +4703,7 @@ pub fn generateBuiltinZigSource(comp: *Compilation, allocator: Allocator) Alloca comp.bin_file.options.link_libcpp, comp.bin_file.options.error_return_tracing, comp.bin_file.options.valgrind, + comp.bin_file.options.tsan, comp.bin_file.options.pic, comp.bin_file.options.pie, comp.bin_file.options.strip, diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 36d004cfc6..7d1c8420af 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -3,13 +3,12 @@ const builtin = @import("builtin"); const ThreadPool = @This(); mutex: std.Thread.Mutex = .{}, +cond: std.Thread.Condition = .{}, +run_queue: RunQueue = .{}, is_running: bool = true, allocator: std.mem.Allocator, -workers: []Worker, -run_queue: RunQueue = .{}, -idle_queue: IdleQueue = .{}, +threads: []std.Thread, -const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent); const RunQueue = std.SinglyLinkedList(Runnable); const Runnable = struct { runFn: RunProto, @@ -20,89 +19,52 @@ const RunProto = switch (builtin.zig_backend) { else => *const fn (*Runnable) void, }; -const Worker = struct { - pool: *ThreadPool, - thread: std.Thread, - /// The node is for this worker only and must have an already initialized event - /// when the thread is spawned. - idle_node: IdleQueue.Node, - - fn run(worker: *Worker) void { - const pool = worker.pool; - - while (true) { - pool.mutex.lock(); - - if (pool.run_queue.popFirst()) |run_node| { - pool.mutex.unlock(); - (run_node.data.runFn)(&run_node.data); - continue; - } - - if (pool.is_running) { - worker.idle_node.data.reset(); - - pool.idle_queue.prepend(&worker.idle_node); - pool.mutex.unlock(); - - worker.idle_node.data.wait(); - continue; - } - - pool.mutex.unlock(); - return; - } - } -}; - pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void { self.* = .{ .allocator = allocator, - .workers = &[_]Worker{}, + .threads = &[_]std.Thread{}, }; - if (builtin.single_threaded) - return; - const worker_count = std.math.max(1, std.Thread.getCpuCount() catch 1); - self.workers = try allocator.alloc(Worker, worker_count); - errdefer allocator.free(self.workers); + if (builtin.single_threaded) { + return; + } - var worker_index: usize = 0; - errdefer self.destroyWorkers(worker_index); - while (worker_index < worker_count) : (worker_index += 1) { - const worker = &self.workers[worker_index]; - worker.pool = self; + const thread_count = std.math.max(1, std.Thread.getCpuCount() catch 1); + self.threads = try allocator.alloc(std.Thread, thread_count); + errdefer allocator.free(self.threads); - // Each worker requires its ResetEvent to be pre-initialized. - try worker.idle_node.data.init(); - errdefer worker.idle_node.data.deinit(); + // kill and join any threads we spawned previously on error. + var spawned: usize = 0; + errdefer self.join(spawned); - worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker}); + for (self.threads) |*thread| { + thread.* = try std.Thread.spawn(.{}, worker, .{self}); + spawned += 1; } } -fn destroyWorkers(self: *ThreadPool, spawned: usize) void { - if (builtin.single_threaded) - return; - - for (self.workers[0..spawned]) |*worker| { - worker.thread.join(); - worker.idle_node.data.deinit(); - } +pub fn deinit(self: *ThreadPool) void { + self.join(self.threads.len); // kill and join all threads. + self.* = undefined; } -pub fn deinit(self: *ThreadPool) void { +fn join(self: *ThreadPool, spawned: usize) void { { self.mutex.lock(); defer self.mutex.unlock(); + // ensure future worker threads exit the dequeue loop self.is_running = false; - while (self.idle_queue.popFirst()) |idle_node| - idle_node.data.set(); } - self.destroyWorkers(self.workers.len); - self.allocator.free(self.workers); + // wake up any sleeping threads (this can be done outside the mutex) + // then wait for all the threads we know are spawned to complete. + self.cond.broadcast(); + for (self.threads[0..spawned]) |thread| { + thread.join(); + } + + self.allocator.free(self.threads); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { @@ -122,24 +84,51 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { const closure = @fieldParentPtr(@This(), "run_node", run_node); @call(.{}, func, closure.arguments); + // The thread pool's allocator is protected by the mutex. const mutex = &closure.pool.mutex; mutex.lock(); defer mutex.unlock(); + closure.pool.allocator.destroy(closure); } }; + { + self.mutex.lock(); + defer self.mutex.unlock(); + + const closure = try self.allocator.create(Closure); + closure.* = .{ + .arguments = args, + .pool = self, + }; + + self.run_queue.prepend(&closure.run_node); + } + + // Notify waiting threads outside the lock to try and keep the critical section small. + self.cond.signal(); +} + +fn worker(self: *ThreadPool) void { self.mutex.lock(); defer self.mutex.unlock(); - const closure = try self.allocator.create(Closure); - closure.* = .{ - .arguments = args, - .pool = self, - }; + while (true) { + while (self.run_queue.popFirst()) |run_node| { + // Temporarily unlock the mutex in order to execute the run_node + self.mutex.unlock(); + defer self.mutex.lock(); - self.run_queue.prepend(&closure.run_node); + const runFn = run_node.data.runFn; + runFn(&run_node.data); + } - if (self.idle_queue.popFirst()) |idle_node| - idle_node.data.set(); + // Stop executing instead of waiting if the thread pool is no longer running. + if (self.is_running) { + self.cond.wait(&self.mutex); + } else { + break; + } + } } diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index e4126b1ab3..860d0a8b4c 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -1,56 +1,39 @@ const std = @import("std"); +const Atomic = std.atomic.Atomic; +const assert = std.debug.assert; const WaitGroup = @This(); -mutex: std.Thread.Mutex = .{}, -counter: usize = 0, -event: std.Thread.ResetEvent, - -pub fn init(self: *WaitGroup) !void { - self.* = .{ - .mutex = .{}, - .counter = 0, - .event = undefined, - }; - try self.event.init(); -} +const is_waiting: usize = 1 << 0; +const one_pending: usize = 1 << 1; -pub fn deinit(self: *WaitGroup) void { - self.event.deinit(); - self.* = undefined; -} +state: Atomic(usize) = Atomic(usize).init(0), +event: std.Thread.ResetEvent = .{}, pub fn start(self: *WaitGroup) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.counter += 1; + const state = self.state.fetchAdd(one_pending, .Monotonic); + assert((state / one_pending) < (std.math.maxInt(usize) / one_pending)); } pub fn finish(self: *WaitGroup) void { - self.mutex.lock(); - defer self.mutex.unlock(); + const state = self.state.fetchSub(one_pending, .Release); + assert((state / one_pending) > 0); - self.counter -= 1; - - if (self.counter == 0) { + if (state == (one_pending | is_waiting)) { + self.state.fence(.Acquire); self.event.set(); } } pub fn wait(self: *WaitGroup) void { - while (true) { - self.mutex.lock(); - - if (self.counter == 0) { - self.mutex.unlock(); - return; - } + var state = self.state.fetchAdd(is_waiting, .Acquire); + assert(state & is_waiting == 0); - self.mutex.unlock(); + if ((state / one_pending) > 0) { self.event.wait(); } } pub fn reset(self: *WaitGroup) void { + self.state.store(0, .Monotonic); self.event.reset(); } diff --git a/src/crash_report.zig b/src/crash_report.zig index e9d4022bba..d38f436aa0 100644 --- a/src/crash_report.zig +++ b/src/crash_report.zig @@ -362,7 +362,7 @@ const PanicSwitch = struct { /// Updated atomically before taking the panic_mutex. /// In recoverable cases, the program will not abort /// until all panicking threads have dumped their traces. - var panicking: u8 = 0; + var panicking = std.atomic.Atomic(u8).init(0); // Locked to avoid interleaving panic messages from multiple threads. var panic_mutex = std.Thread.Mutex{}; @@ -430,7 +430,7 @@ const PanicSwitch = struct { }; state.* = new_state; - _ = @atomicRmw(u8, &panicking, .Add, 1, .SeqCst); + _ = panicking.fetchAdd(1, .SeqCst); state.recover_stage = .release_ref_count; @@ -512,13 +512,14 @@ const PanicSwitch = struct { noinline fn releaseRefCount(state: *volatile PanicState) noreturn { state.recover_stage = .abort; - if (@atomicRmw(u8, &panicking, .Sub, 1, .SeqCst) != 1) { + if (panicking.fetchSub(1, .SeqCst) != 1) { // Another thread is panicking, wait for the last one to finish // and call abort() // Sleep forever without hammering the CPU - var event: std.Thread.StaticResetEvent = .{}; - event.wait(); + var futex = std.atomic.Atomic(u32).init(0); + while (true) std.Thread.Futex.wait(&futex, 0); + // This should be unreachable, recurse into recoverAbort. @panic("event.wait() returned"); } diff --git a/src/stage1/codegen.cpp b/src/stage1/codegen.cpp index 3138e93c4c..a2efed6bde 100644 --- a/src/stage1/codegen.cpp +++ b/src/stage1/codegen.cpp @@ -9993,6 +9993,7 @@ Buf *codegen_generate_builtin_source(CodeGen *g) { buf_appendf(contents, "pub const link_libcpp = %s;\n", bool_to_str(g->link_libcpp)); buf_appendf(contents, "pub const have_error_return_tracing = %s;\n", bool_to_str(g->have_err_ret_tracing)); buf_appendf(contents, "pub const valgrind_support = false;\n"); + buf_appendf(contents, "pub const sanitize_thread = false;\n"); buf_appendf(contents, "pub const position_independent_code = %s;\n", bool_to_str(g->have_pic)); buf_appendf(contents, "pub const position_independent_executable = %s;\n", bool_to_str(g->have_pie)); buf_appendf(contents, "pub const strip_debug_info = %s;\n", bool_to_str(g->strip_debug_symbols)); -- cgit v1.2.3 From 9b05474d797ea4600dbae36ae95d9eb042040bb2 Mon Sep 17 00:00:00 2001 From: Cody Tapscott Date: Fri, 10 Jun 2022 07:21:54 -0700 Subject: ThreadPool: Make join() a no-op in single-threaded mode This comptime gate is needed to make sure that purely single-threaded programs don't generate calls to the std.Thread API. WASI targets successfully build again with this change. --- src/ThreadPool.zig | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/ThreadPool.zig') diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 7d1c8420af..55e40ea287 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -49,6 +49,10 @@ pub fn deinit(self: *ThreadPool) void { } fn join(self: *ThreadPool, spawned: usize) void { + if (builtin.single_threaded) { + return; + } + { self.mutex.lock(); defer self.mutex.unlock(); -- cgit v1.2.3 From 5cd548e53081428d0e6b4a6b5a305317052c133a Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 16 Jun 2022 20:23:22 -0700 Subject: Compilation: multi-thread compiler-rt compiler_rt_lib and compiler_rt_obj are extracted from the generic JobQueue into simple boolean flags, and then handled explicitly inside performAllTheWork(). Introduced generic handling of allocation failure and made setMiscFailure not return a possible error. Building the compiler-rt static library now takes advantage of Compilation's ThreadPool. This introduced a problem, however, because now each of the object files of compiler-rt all perform AstGen for the full standard library and compiler-rt files. Even though all of them end up being cache hits except for the first ones, this is wasteful - O(N*M) where N is number of compilation units inside compiler-rt and M is the number of .zig files in the standard library and compiler-rt combined. More importantly, however, it causes a deadlock, because each thread interacts with a file system lock for doing AstGen on files, and threads end up waiting for each other. This will need to be handled with a process-level file caching system, or some other creative solution. --- src/Compilation.zig | 193 +++++++++------- src/ThreadPool.zig | 81 ++++--- src/WaitGroup.zig | 7 + src/compiler_rt.zig | 637 +++++++++++++++++++++++++++++----------------------- 4 files changed, 528 insertions(+), 390 deletions(-) (limited to 'src/ThreadPool.zig') diff --git a/src/Compilation.zig b/src/Compilation.zig index 2858a28f42..65a2ad92b4 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -93,6 +93,9 @@ unwind_tables: bool, test_evented_io: bool, debug_compiler_runtime_libs: bool, debug_compile_errors: bool, +job_queued_compiler_rt_lib: bool = false, +job_queued_compiler_rt_obj: bool = false, +alloc_failure_occurred: bool = false, c_source_files: []const CSourceFile, clang_argv: []const []const u8, @@ -130,11 +133,11 @@ libssp_static_lib: ?CRTFile = null, /// Populated when we build the libc static library. A Job to build this is placed in the queue /// and resolved before calling linker.flush(). libc_static_lib: ?CRTFile = null, -/// Populated when we build the libcompiler_rt static library. A Job to build this is placed in the queue -/// and resolved before calling linker.flush(). +/// Populated when we build the libcompiler_rt static library. A Job to build this is indicated +/// by setting `job_queued_compiler_rt_lib` and resolved before calling linker.flush(). compiler_rt_lib: ?CRTFile = null, -/// Populated when we build the compiler_rt_obj object. A Job to build this is placed in the queue -/// and resolved before calling linker.flush(). +/// Populated when we build the compiler_rt_obj object. A Job to build this is indicated +/// by setting `job_queued_compiler_rt_obj` and resolved before calling linker.flush(). compiler_rt_obj: ?CRTFile = null, glibc_so_files: ?glibc.BuiltSharedObjects = null, @@ -224,8 +227,6 @@ const Job = union(enum) { libcxxabi: void, libtsan: void, libssp: void, - compiler_rt_lib: void, - compiler_rt_obj: void, /// needed when not linking libc and using LLVM for code generation because it generates /// calls to, for example, memcpy and memset. zig_libc: void, @@ -1925,13 +1926,13 @@ pub fn create(gpa: Allocator, options: InitOptions) !*Compilation { if (comp.bin_file.options.include_compiler_rt and capable_of_building_compiler_rt) { if (is_exe_or_dyn_lib) { log.debug("queuing a job to build compiler_rt_lib", .{}); - try comp.work_queue.writeItem(.{ .compiler_rt_lib = {} }); + comp.job_queued_compiler_rt_lib = true; } else if (options.output_mode != .Obj) { log.debug("queuing a job to build compiler_rt_obj", .{}); // If build-obj with -fcompiler-rt is requested, that is handled specially // elsewhere. In this case we are making a static library, so we ask // for a compiler-rt object to put in it. - try comp.work_queue.writeItem(.{ .compiler_rt_obj = {} }); + comp.job_queued_compiler_rt_obj = true; } } if (needs_c_symbols) { @@ -2021,6 +2022,7 @@ pub fn destroy(self: *Compilation) void { } pub fn clearMiscFailures(comp: *Compilation) void { + comp.alloc_failure_occurred = false; for (comp.misc_failures.values()) |*value| { value.deinit(comp.gpa); } @@ -2533,8 +2535,10 @@ pub fn makeBinFileWritable(self: *Compilation) !void { return self.bin_file.makeWritable(); } +/// This function is temporally single-threaded. pub fn totalErrorCount(self: *Compilation) usize { - var total: usize = self.failed_c_objects.count() + self.misc_failures.count(); + var total: usize = self.failed_c_objects.count() + self.misc_failures.count() + + @boolToInt(self.alloc_failure_occurred); if (self.bin_file.options.module) |module| { total += module.failed_exports.count(); @@ -2591,6 +2595,7 @@ pub fn totalErrorCount(self: *Compilation) usize { return total; } +/// This function is temporally single-threaded. pub fn getAllErrorsAlloc(self: *Compilation) !AllErrors { var arena = std.heap.ArenaAllocator.init(self.gpa); errdefer arena.deinit(); @@ -2623,6 +2628,9 @@ pub fn getAllErrorsAlloc(self: *Compilation) !AllErrors { for (self.misc_failures.values()) |*value| { try AllErrors.addPlainWithChildren(&arena, &errors, value.msg, value.children); } + if (self.alloc_failure_occurred) { + try AllErrors.addPlain(&arena, &errors, "memory allocation failure"); + } if (self.bin_file.options.module) |module| { { var it = module.failed_files.iterator(); @@ -2737,9 +2745,15 @@ pub fn performAllTheWork( var embed_file_prog_node = main_progress_node.start("Detect @embedFile updates", comp.embed_file_work_queue.count); defer embed_file_prog_node.end(); + // +1 for the link step + var compiler_rt_prog_node = main_progress_node.start("compiler_rt", compiler_rt.sources.len + 1); + defer compiler_rt_prog_node.end(); + comp.work_queue_wait_group.reset(); defer comp.work_queue_wait_group.wait(); + const use_stage1 = build_options.is_stage1 and comp.bin_file.options.use_stage1; + { const astgen_frame = tracy.namedFrame("astgen"); defer astgen_frame.end(); @@ -2782,9 +2796,28 @@ pub fn performAllTheWork( comp, c_object, &c_obj_prog_node, &comp.work_queue_wait_group, }); } + + if (comp.job_queued_compiler_rt_lib) { + comp.job_queued_compiler_rt_lib = false; + + if (use_stage1) { + // stage1 LLVM backend uses the global context and thus cannot be used in + // a multi-threaded context. + buildCompilerRtOneShot(comp, .Lib, &comp.compiler_rt_lib); + } else { + comp.work_queue_wait_group.start(); + try comp.thread_pool.spawn(workerBuildCompilerRtLib, .{ + comp, &compiler_rt_prog_node, &comp.work_queue_wait_group, + }); + } + } + + if (comp.job_queued_compiler_rt_obj) { + comp.job_queued_compiler_rt_obj = false; + buildCompilerRtOneShot(comp, .Obj, &comp.compiler_rt_obj); + } } - const use_stage1 = build_options.is_stage1 and comp.bin_file.options.use_stage1; if (!use_stage1) { const outdated_and_deleted_decls_frame = tracy.namedFrame("outdated_and_deleted_decls"); defer outdated_and_deleted_decls_frame.end(); @@ -2997,7 +3030,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { module.semaPkg(pkg) catch |err| switch (err) { error.CurrentWorkingDirectoryUnlinked, error.Unexpected, - => try comp.setMiscFailure( + => comp.lockAndSetMiscFailure( .analyze_pkg, "unexpected problem analyzing package '{s}'", .{pkg.root_src_path}, @@ -3012,7 +3045,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { glibc.buildCRTFile(comp, crt_file) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure(.glibc_crt_file, "unable to build glibc CRT file: {s}", .{ + comp.lockAndSetMiscFailure(.glibc_crt_file, "unable to build glibc CRT file: {s}", .{ @errorName(err), }); }; @@ -3023,7 +3056,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { glibc.buildSharedObjects(comp) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .glibc_shared_objects, "unable to build glibc shared objects: {s}", .{@errorName(err)}, @@ -3036,7 +3069,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { musl.buildCRTFile(comp, crt_file) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .musl_crt_file, "unable to build musl CRT file: {s}", .{@errorName(err)}, @@ -3049,7 +3082,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { mingw.buildCRTFile(comp, crt_file) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .mingw_crt_file, "unable to build mingw-w64 CRT file: {s}", .{@errorName(err)}, @@ -3063,7 +3096,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { const link_lib = comp.bin_file.options.system_libs.keys()[index]; mingw.buildImportLib(comp, link_lib) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .windows_import_lib, "unable to generate DLL import .lib file: {s}", .{@errorName(err)}, @@ -3076,7 +3109,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { libunwind.buildStaticLib(comp) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .libunwind, "unable to build libunwind: {s}", .{@errorName(err)}, @@ -3089,7 +3122,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { libcxx.buildLibCXX(comp) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .libcxx, "unable to build libcxx: {s}", .{@errorName(err)}, @@ -3102,7 +3135,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { libcxx.buildLibCXXABI(comp) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .libcxxabi, "unable to build libcxxabi: {s}", .{@errorName(err)}, @@ -3115,7 +3148,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { libtsan.buildTsan(comp) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .libtsan, "unable to build TSAN library: {s}", .{@errorName(err)}, @@ -3128,49 +3161,13 @@ fn processOneJob(comp: *Compilation, job: Job) !void { wasi_libc.buildCRTFile(comp, crt_file) catch |err| { // TODO Surface more error details. - try comp.setMiscFailure( + comp.lockAndSetMiscFailure( .wasi_libc_crt_file, "unable to build WASI libc CRT file: {s}", .{@errorName(err)}, ); }; }, - .compiler_rt_lib => { - const named_frame = tracy.namedFrame("compiler_rt_lib"); - defer named_frame.end(); - - compiler_rt.buildCompilerRtLib( - comp, - &comp.compiler_rt_lib, - ) catch |err| switch (err) { - error.OutOfMemory => return error.OutOfMemory, - error.SubCompilationFailed => return, // error reported already - else => try comp.setMiscFailure( - .compiler_rt, - "unable to build compiler_rt: {s}", - .{@errorName(err)}, - ), - }; - }, - .compiler_rt_obj => { - const named_frame = tracy.namedFrame("compiler_rt_obj"); - defer named_frame.end(); - - comp.buildOutputFromZig( - "compiler_rt.zig", - .Obj, - &comp.compiler_rt_obj, - .compiler_rt, - ) catch |err| switch (err) { - error.OutOfMemory => return error.OutOfMemory, - error.SubCompilationFailed => return, // error reported already - else => try comp.setMiscFailure( - .compiler_rt, - "unable to build compiler_rt: {s}", - .{@errorName(err)}, - ), - }; - }, .libssp => { const named_frame = tracy.namedFrame("libssp"); defer named_frame.end(); @@ -3183,7 +3180,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { ) catch |err| switch (err) { error.OutOfMemory => return error.OutOfMemory, error.SubCompilationFailed => return, // error reported already - else => try comp.setMiscFailure( + else => comp.lockAndSetMiscFailure( .libssp, "unable to build libssp: {s}", .{@errorName(err)}, @@ -3202,7 +3199,7 @@ fn processOneJob(comp: *Compilation, job: Job) !void { ) catch |err| switch (err) { error.OutOfMemory => return error.OutOfMemory, error.SubCompilationFailed => return, // error reported already - else => try comp.setMiscFailure( + else => comp.lockAndSetMiscFailure( .zig_libc, "unable to build zig's multitarget libc: {s}", .{@errorName(err)}, @@ -3306,11 +3303,7 @@ fn workerUpdateBuiltinZigFile( comp.setMiscFailure(.write_builtin_zig, "unable to write builtin.zig to {s}: {s}", .{ dir_path, @errorName(err), - }) catch |oom| switch (oom) { - error.OutOfMemory => log.err("unable to write builtin.zig to {s}: {s}", .{ - dir_path, @errorName(err), - }), - }; + }); }; } @@ -3524,6 +3517,38 @@ fn workerUpdateCObject( }; } +fn buildCompilerRtOneShot( + comp: *Compilation, + output_mode: std.builtin.OutputMode, + out: *?CRTFile, +) void { + comp.buildOutputFromZig("compiler_rt.zig", output_mode, out, .compiler_rt) catch |err| switch (err) { + error.SubCompilationFailed => return, // error reported already + else => comp.lockAndSetMiscFailure( + .compiler_rt, + "unable to build compiler_rt: {s}", + .{@errorName(err)}, + ), + }; +} + +fn workerBuildCompilerRtLib( + comp: *Compilation, + progress_node: *std.Progress.Node, + wg: *WaitGroup, +) void { + defer wg.finish(); + + compiler_rt.buildCompilerRtLib(comp, progress_node) catch |err| switch (err) { + error.SubCompilationFailed => return, // error reported already + else => comp.lockAndSetMiscFailure( + .compiler_rt, + "unable to build compiler_rt: {s}", + .{@errorName(err)}, + ), + }; +} + fn reportRetryableCObjectError( comp: *Compilation, c_object: *CObject, @@ -4622,14 +4647,21 @@ fn wantBuildLibUnwindFromSource(comp: *Compilation) bool { comp.bin_file.options.object_format != .c; } -fn setMiscFailure( +fn setAllocFailure(comp: *Compilation) void { + log.debug("memory allocation failure", .{}); + comp.alloc_failure_occurred = true; +} + +/// Assumes that Compilation mutex is locked. +/// See also `lockAndSetMiscFailure`. +pub fn setMiscFailure( comp: *Compilation, tag: MiscTask, comptime format: []const u8, args: anytype, -) Allocator.Error!void { - try comp.misc_failures.ensureUnusedCapacity(comp.gpa, 1); - const msg = try std.fmt.allocPrint(comp.gpa, format, args); +) void { + comp.misc_failures.ensureUnusedCapacity(comp.gpa, 1) catch return comp.setAllocFailure(); + const msg = std.fmt.allocPrint(comp.gpa, format, args) catch return comp.setAllocFailure(); const gop = comp.misc_failures.getOrPutAssumeCapacity(tag); if (gop.found_existing) { gop.value_ptr.deinit(comp.gpa); @@ -4637,6 +4669,19 @@ fn setMiscFailure( gop.value_ptr.* = .{ .msg = msg }; } +/// See also `setMiscFailure`. +pub fn lockAndSetMiscFailure( + comp: *Compilation, + tag: MiscTask, + comptime format: []const u8, + args: anytype, +) void { + comp.mutex.lock(); + defer comp.mutex.unlock(); + + return setMiscFailure(comp, tag, format, args); +} + pub fn dump_argv(argv: []const []const u8) void { for (argv[0 .. argv.len - 1]) |arg| { std.debug.print("{s} ", .{arg}); @@ -4896,7 +4941,7 @@ pub fn updateSubCompilation(sub_compilation: *Compilation) !void { } } -pub fn buildOutputFromZig( +fn buildOutputFromZig( comp: *Compilation, src_basename: []const u8, output_mode: std.builtin.OutputMode, @@ -4913,15 +4958,7 @@ pub fn buildOutputFromZig( .root_src_path = src_basename, }; defer main_pkg.deinitTable(comp.gpa); - - const root_name = root_name: { - const basename = if (std.fs.path.dirname(src_basename)) |dirname| - src_basename[dirname.len + 1 ..] - else - src_basename; - const root_name = basename[0 .. basename.len - std.fs.path.extension(basename).len]; - break :root_name root_name; - }; + const root_name = src_basename[0 .. src_basename.len - std.fs.path.extension(src_basename).len]; const target = comp.getTarget(); const bin_basename = try std.zig.binNameAlloc(comp.gpa, .{ .root_name = root_name, diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 55e40ea287..7115adbddd 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -1,6 +1,7 @@ const std = @import("std"); const builtin = @import("builtin"); const ThreadPool = @This(); +const WaitGroup = @import("WaitGroup.zig"); mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, @@ -19,8 +20,8 @@ const RunProto = switch (builtin.zig_backend) { else => *const fn (*Runnable) void, }; -pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void { - self.* = .{ +pub fn init(pool: *ThreadPool, allocator: std.mem.Allocator) !void { + pool.* = .{ .allocator = allocator, .threads = &[_]std.Thread{}, }; @@ -30,48 +31,48 @@ pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void { } const thread_count = std.math.max(1, std.Thread.getCpuCount() catch 1); - self.threads = try allocator.alloc(std.Thread, thread_count); - errdefer allocator.free(self.threads); + pool.threads = try allocator.alloc(std.Thread, thread_count); + errdefer allocator.free(pool.threads); // kill and join any threads we spawned previously on error. var spawned: usize = 0; - errdefer self.join(spawned); + errdefer pool.join(spawned); - for (self.threads) |*thread| { - thread.* = try std.Thread.spawn(.{}, worker, .{self}); + for (pool.threads) |*thread| { + thread.* = try std.Thread.spawn(.{}, worker, .{pool}); spawned += 1; } } -pub fn deinit(self: *ThreadPool) void { - self.join(self.threads.len); // kill and join all threads. - self.* = undefined; +pub fn deinit(pool: *ThreadPool) void { + pool.join(pool.threads.len); // kill and join all threads. + pool.* = undefined; } -fn join(self: *ThreadPool, spawned: usize) void { +fn join(pool: *ThreadPool, spawned: usize) void { if (builtin.single_threaded) { return; } { - self.mutex.lock(); - defer self.mutex.unlock(); + pool.mutex.lock(); + defer pool.mutex.unlock(); // ensure future worker threads exit the dequeue loop - self.is_running = false; + pool.is_running = false; } // wake up any sleeping threads (this can be done outside the mutex) // then wait for all the threads we know are spawned to complete. - self.cond.broadcast(); - for (self.threads[0..spawned]) |thread| { + pool.cond.broadcast(); + for (pool.threads[0..spawned]) |thread| { thread.join(); } - self.allocator.free(self.threads); + pool.allocator.free(pool.threads); } -pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { +pub fn spawn(pool: *ThreadPool, comptime func: anytype, args: anytype) !void { if (builtin.single_threaded) { @call(.{}, func, args); return; @@ -98,41 +99,57 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { }; { - self.mutex.lock(); - defer self.mutex.unlock(); + pool.mutex.lock(); + defer pool.mutex.unlock(); - const closure = try self.allocator.create(Closure); + const closure = try pool.allocator.create(Closure); closure.* = .{ .arguments = args, - .pool = self, + .pool = pool, }; - self.run_queue.prepend(&closure.run_node); + pool.run_queue.prepend(&closure.run_node); } // Notify waiting threads outside the lock to try and keep the critical section small. - self.cond.signal(); + pool.cond.signal(); } -fn worker(self: *ThreadPool) void { - self.mutex.lock(); - defer self.mutex.unlock(); +fn worker(pool: *ThreadPool) void { + pool.mutex.lock(); + defer pool.mutex.unlock(); while (true) { - while (self.run_queue.popFirst()) |run_node| { + while (pool.run_queue.popFirst()) |run_node| { // Temporarily unlock the mutex in order to execute the run_node - self.mutex.unlock(); - defer self.mutex.lock(); + pool.mutex.unlock(); + defer pool.mutex.lock(); const runFn = run_node.data.runFn; runFn(&run_node.data); } // Stop executing instead of waiting if the thread pool is no longer running. - if (self.is_running) { - self.cond.wait(&self.mutex); + if (pool.is_running) { + pool.cond.wait(&pool.mutex); } else { break; } } } + +pub fn waitAndWork(pool: *ThreadPool, wait_group: *WaitGroup) void { + while (!wait_group.isDone()) { + if (blk: { + pool.mutex.lock(); + defer pool.mutex.unlock(); + break :blk pool.run_queue.popFirst(); + }) |run_node| { + run_node.data.runFn(&run_node.data); + continue; + } + + wait_group.wait(); + return; + } +} diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig index 860d0a8b4c..c8be6658db 100644 --- a/src/WaitGroup.zig +++ b/src/WaitGroup.zig @@ -37,3 +37,10 @@ pub fn reset(self: *WaitGroup) void { self.state.store(0, .Monotonic); self.event.reset(); } + +pub fn isDone(wg: *WaitGroup) bool { + const state = wg.state.load(.Acquire); + assert(state & is_waiting == 0); + + return (state / one_pending) == 0; +} diff --git a/src/compiler_rt.zig b/src/compiler_rt.zig index 70276231ce..4e43f0be88 100644 --- a/src/compiler_rt.zig +++ b/src/compiler_rt.zig @@ -12,316 +12,393 @@ const Compilation = @import("Compilation.zig"); const CRTFile = Compilation.CRTFile; const LinkObject = Compilation.LinkObject; const Package = @import("Package.zig"); +const WaitGroup = @import("WaitGroup.zig"); -pub fn buildCompilerRtLib(comp: *Compilation, compiler_rt_lib: *?CRTFile) !void { - const tracy_trace = trace(@src()); - defer tracy_trace.end(); - +pub fn buildCompilerRtLib(comp: *Compilation, progress_node: *std.Progress.Node) !void { var arena_allocator = std.heap.ArenaAllocator.init(comp.gpa); defer arena_allocator.deinit(); const arena = arena_allocator.allocator(); const target = comp.getTarget(); - // Use the global cache directory. - var cache_parent: Cache = .{ - .gpa = comp.gpa, - .manifest_dir = try comp.global_cache_directory.handle.makeOpenPath("h", .{}), + const root_name = "compiler_rt"; + const basename = try std.zig.binNameAlloc(arena, .{ + .root_name = root_name, + .target = target, + .output_mode = .Lib, + }); + + var link_objects: [sources.len]LinkObject = undefined; + var crt_files = [1]?CRTFile{null} ** sources.len; + defer deinitCrtFiles(comp, crt_files); + + { + var wg: WaitGroup = .{}; + defer comp.thread_pool.waitAndWork(&wg); + + for (sources) |source, i| { + wg.start(); + try comp.thread_pool.spawn(workerBuildObject, .{ + comp, progress_node, &wg, source, &crt_files[i], + }); + } + } + + for (link_objects) |*link_object, i| { + link_object.* = .{ + .path = crt_files[i].?.full_object_path, + }; + } + + var link_progress_node = progress_node.start("link", 0); + link_progress_node.activate(); + defer link_progress_node.end(); + + // TODO: This is extracted into a local variable to work around a stage1 miscompilation. + const emit_bin = Compilation.EmitLoc{ + .directory = null, // Put it in the cache directory. + .basename = basename, + }; + const sub_compilation = try Compilation.create(comp.gpa, .{ + .local_cache_directory = comp.global_cache_directory, + .global_cache_directory = comp.global_cache_directory, + .zig_lib_directory = comp.zig_lib_directory, + .cache_mode = .whole, + .target = target, + .root_name = root_name, + .main_pkg = null, + .output_mode = .Lib, + .link_mode = .Static, + .thread_pool = comp.thread_pool, + .libc_installation = comp.bin_file.options.libc_installation, + .emit_bin = emit_bin, + .optimize_mode = comp.compilerRtOptMode(), + .want_sanitize_c = false, + .want_stack_check = false, + .want_red_zone = comp.bin_file.options.red_zone, + .omit_frame_pointer = comp.bin_file.options.omit_frame_pointer, + .want_valgrind = false, + .want_tsan = false, + .want_pic = comp.bin_file.options.pic, + .want_pie = comp.bin_file.options.pie, + .want_lto = comp.bin_file.options.lto, + .emit_h = null, + .strip = comp.compilerRtStrip(), + .is_native_os = comp.bin_file.options.is_native_os, + .is_native_abi = comp.bin_file.options.is_native_abi, + .self_exe_path = comp.self_exe_path, + .link_objects = &link_objects, + .verbose_cc = comp.verbose_cc, + .verbose_link = comp.bin_file.options.verbose_link, + .verbose_air = comp.verbose_air, + .verbose_llvm_ir = comp.verbose_llvm_ir, + .verbose_cimport = comp.verbose_cimport, + .verbose_llvm_cpu_features = comp.verbose_llvm_cpu_features, + .clang_passthrough_mode = comp.clang_passthrough_mode, + .skip_linker_dependencies = true, + .parent_compilation_link_libc = comp.bin_file.options.link_libc, + }); + defer sub_compilation.destroy(); + + try sub_compilation.updateSubCompilation(); + + assert(comp.compiler_rt_lib == null); + comp.compiler_rt_lib = .{ + .full_object_path = try sub_compilation.bin_file.options.emit.?.directory.join(comp.gpa, &[_][]const u8{ + sub_compilation.bin_file.options.emit.?.sub_path, + }), + .lock = sub_compilation.bin_file.toOwnedLock(), }; - defer cache_parent.manifest_dir.close(); +} - var cache = cache_parent.obtain(); - defer cache.deinit(); +fn deinitCrtFiles(comp: *Compilation, crt_files: [sources.len]?CRTFile) void { + const gpa = comp.gpa; - cache.hash.add(sources.len); - for (sources) |source| { - const full_path = try comp.zig_lib_directory.join(arena, &[_][]const u8{source}); - _ = try cache.addFile(full_path, null); + for (crt_files) |opt_crt_file| { + var crt_file = opt_crt_file orelse continue; + crt_file.deinit(gpa); } +} - cache.hash.addBytes(build_options.version); - cache.hash.addBytes(comp.zig_lib_directory.path orelse "."); - cache.hash.add(target.cpu.arch); - cache.hash.add(target.os.tag); - cache.hash.add(target.abi); +fn workerBuildObject( + comp: *Compilation, + progress_node: *std.Progress.Node, + wg: *WaitGroup, + src_basename: []const u8, + out: *?CRTFile, +) void { + defer wg.finish(); - const hit = try cache.hit(); - const digest = cache.final(); - const o_sub_path = try std.fs.path.join(arena, &[_][]const u8{ "o", &digest }); + var obj_progress_node = progress_node.start(src_basename, 0); + obj_progress_node.activate(); + defer obj_progress_node.end(); - var o_directory: Compilation.Directory = .{ - .handle = try comp.global_cache_directory.handle.makeOpenPath(o_sub_path, .{}), - .path = try std.fs.path.join(arena, &[_][]const u8{ comp.global_cache_directory.path.?, o_sub_path }), + buildObject(comp, src_basename, out) catch |err| switch (err) { + error.SubCompilationFailed => return, // error reported already + else => comp.lockAndSetMiscFailure( + .compiler_rt, + "unable to build compiler_rt: {s}", + .{@errorName(err)}, + ), }; - defer o_directory.handle.close(); +} - const ok_basename = "ok"; - const actual_hit = if (hit) blk: { - o_directory.handle.access(ok_basename, .{}) catch |err| switch (err) { - error.FileNotFound => break :blk false, - else => |e| return e, - }; - break :blk true; - } else false; +fn buildObject(comp: *Compilation, src_basename: []const u8, out: *?CRTFile) !void { + const gpa = comp.gpa; - const root_name = "compiler_rt"; - const basename = try std.zig.binNameAlloc(arena, .{ + var root_src_path_buf: [64]u8 = undefined; + const root_src_path = std.fmt.bufPrint( + &root_src_path_buf, + "compiler_rt" ++ std.fs.path.sep_str ++ "{s}", + .{src_basename}, + ) catch unreachable; + + var main_pkg: Package = .{ + .root_src_directory = comp.zig_lib_directory, + .root_src_path = root_src_path, + }; + defer main_pkg.deinitTable(gpa); + const root_name = src_basename[0 .. src_basename.len - std.fs.path.extension(src_basename).len]; + const target = comp.getTarget(); + const output_mode: std.builtin.OutputMode = .Obj; + const bin_basename = try std.zig.binNameAlloc(gpa, .{ .root_name = root_name, .target = target, - .output_mode = .Lib, + .output_mode = output_mode, }); + defer gpa.free(bin_basename); - if (!actual_hit) { - var progress: std.Progress = .{ .dont_print_on_dumb = true }; - var progress_node = progress.start("Compile Compiler-RT", sources.len + 1); - defer progress_node.end(); - if (comp.color == .off) progress.terminal = null; - - progress_node.activate(); + const emit_bin = Compilation.EmitLoc{ + .directory = null, // Put it in the cache directory. + .basename = bin_basename, + }; + const sub_compilation = try Compilation.create(gpa, .{ + .global_cache_directory = comp.global_cache_directory, + .local_cache_directory = comp.global_cache_directory, + .zig_lib_directory = comp.zig_lib_directory, + .cache_mode = .whole, + .target = target, + .root_name = root_name, + .main_pkg = &main_pkg, + .output_mode = output_mode, + .thread_pool = comp.thread_pool, + .libc_installation = comp.bin_file.options.libc_installation, + .emit_bin = emit_bin, + .optimize_mode = comp.compilerRtOptMode(), + .link_mode = .Static, + .want_sanitize_c = false, + .want_stack_check = false, + .want_red_zone = comp.bin_file.options.red_zone, + .omit_frame_pointer = comp.bin_file.options.omit_frame_pointer, + .want_valgrind = false, + .want_tsan = false, + .want_pic = comp.bin_file.options.pic, + .want_pie = comp.bin_file.options.pie, + .emit_h = null, + .strip = comp.compilerRtStrip(), + .is_native_os = comp.bin_file.options.is_native_os, + .is_native_abi = comp.bin_file.options.is_native_abi, + .self_exe_path = comp.self_exe_path, + .verbose_cc = comp.verbose_cc, + .verbose_link = comp.bin_file.options.verbose_link, + .verbose_air = comp.verbose_air, + .verbose_llvm_ir = comp.verbose_llvm_ir, + .verbose_cimport = comp.verbose_cimport, + .verbose_llvm_cpu_features = comp.verbose_llvm_cpu_features, + .clang_passthrough_mode = comp.clang_passthrough_mode, + .skip_linker_dependencies = true, + .parent_compilation_link_libc = comp.bin_file.options.link_libc, + }); + defer sub_compilation.destroy(); - var link_objects: [sources.len]LinkObject = undefined; - for (sources) |source, i| { - var obj_progress_node = progress_node.start(source, 0); - obj_progress_node.activate(); - defer obj_progress_node.end(); + try sub_compilation.update(); + // Look for compilation errors in this sub_compilation. + var keep_errors = false; + var errors = try sub_compilation.getAllErrorsAlloc(); + defer if (!keep_errors) errors.deinit(sub_compilation.gpa); - var tmp_crt_file: ?CRTFile = null; - defer if (tmp_crt_file) |*crt| crt.deinit(comp.gpa); - try comp.buildOutputFromZig(source, .Obj, &tmp_crt_file, .compiler_rt); - link_objects[i] = .{ - .path = try arena.dupe(u8, tmp_crt_file.?.full_object_path), - .must_link = true, - }; - } + if (errors.list.len != 0) { + const misc_task_tag: Compilation.MiscTask = .compiler_rt; - var lib_progress_node = progress_node.start(root_name, 0); - lib_progress_node.activate(); - defer lib_progress_node.end(); + comp.mutex.lock(); + defer comp.mutex.unlock(); - // TODO: This is extracted into a local variable to work around a stage1 miscompilation. - const emit_bin = Compilation.EmitLoc{ - .directory = o_directory, // Put it in the cache directory. - .basename = basename, - }; - const sub_compilation = try Compilation.create(comp.gpa, .{ - .local_cache_directory = comp.global_cache_directory, - .global_cache_directory = comp.global_cache_directory, - .zig_lib_directory = comp.zig_lib_directory, - .cache_mode = .whole, - .target = target, - .root_name = root_name, - .main_pkg = null, - .output_mode = .Lib, - .link_mode = .Static, - .thread_pool = comp.thread_pool, - .libc_installation = comp.bin_file.options.libc_installation, - .emit_bin = emit_bin, - .optimize_mode = comp.compilerRtOptMode(), - .want_sanitize_c = false, - .want_stack_check = false, - .want_red_zone = comp.bin_file.options.red_zone, - .omit_frame_pointer = comp.bin_file.options.omit_frame_pointer, - .want_valgrind = false, - .want_tsan = false, - .want_pic = comp.bin_file.options.pic, - .want_pie = comp.bin_file.options.pie, - .want_lto = comp.bin_file.options.lto, - .emit_h = null, - .strip = comp.compilerRtStrip(), - .is_native_os = comp.bin_file.options.is_native_os, - .is_native_abi = comp.bin_file.options.is_native_abi, - .self_exe_path = comp.self_exe_path, - .link_objects = &link_objects, - .verbose_cc = comp.verbose_cc, - .verbose_link = comp.bin_file.options.verbose_link, - .verbose_air = comp.verbose_air, - .verbose_llvm_ir = comp.verbose_llvm_ir, - .verbose_cimport = comp.verbose_cimport, - .verbose_llvm_cpu_features = comp.verbose_llvm_cpu_features, - .clang_passthrough_mode = comp.clang_passthrough_mode, - .skip_linker_dependencies = true, - .parent_compilation_link_libc = comp.bin_file.options.link_libc, + try comp.misc_failures.ensureUnusedCapacity(gpa, 1); + comp.misc_failures.putAssumeCapacityNoClobber(misc_task_tag, .{ + .msg = try std.fmt.allocPrint(gpa, "sub-compilation of {s} failed", .{ + @tagName(misc_task_tag), + }), + .children = errors, }); - defer sub_compilation.destroy(); - - try sub_compilation.updateSubCompilation(); - - if (o_directory.handle.createFile(ok_basename, .{})) |file| { - file.close(); - } else |err| { - std.log.warn("compiler-rt lib: failed to mark completion: {s}", .{@errorName(err)}); - } + keep_errors = true; + return error.SubCompilationFailed; } - try cache.writeManifest(); - - assert(compiler_rt_lib.* == null); - compiler_rt_lib.* = .{ - .full_object_path = try std.fs.path.join(comp.gpa, &[_][]const u8{ - comp.global_cache_directory.path.?, - o_sub_path, - basename, + assert(out.* == null); + out.* = Compilation.CRTFile{ + .full_object_path = try sub_compilation.bin_file.options.emit.?.directory.join(gpa, &[_][]const u8{ + sub_compilation.bin_file.options.emit.?.sub_path, }), - .lock = cache.toOwnedLock(), + .lock = sub_compilation.bin_file.toOwnedLock(), }; } -const sources = &[_][]const u8{ - "compiler_rt/absvdi2.zig", - "compiler_rt/absvsi2.zig", - "compiler_rt/absvti2.zig", - "compiler_rt/adddf3.zig", - "compiler_rt/addo.zig", - "compiler_rt/addsf3.zig", - "compiler_rt/addtf3.zig", - "compiler_rt/addxf3.zig", - "compiler_rt/arm.zig", - "compiler_rt/atomics.zig", - "compiler_rt/aulldiv.zig", - "compiler_rt/aullrem.zig", - "compiler_rt/bswap.zig", - "compiler_rt/ceil.zig", - "compiler_rt/clear_cache.zig", - "compiler_rt/cmp.zig", - "compiler_rt/cmpdf2.zig", - "compiler_rt/cmpsf2.zig", - "compiler_rt/cmptf2.zig", - "compiler_rt/cmpxf2.zig", - "compiler_rt/cos.zig", - "compiler_rt/count0bits.zig", - "compiler_rt/divdf3.zig", - "compiler_rt/divsf3.zig", - "compiler_rt/divtf3.zig", - "compiler_rt/divti3.zig", - "compiler_rt/divxf3.zig", - "compiler_rt/emutls.zig", - "compiler_rt/exp.zig", - "compiler_rt/exp2.zig", - "compiler_rt/extenddftf2.zig", - "compiler_rt/extenddfxf2.zig", - "compiler_rt/extendhfsf2.zig", - "compiler_rt/extendhftf2.zig", - "compiler_rt/extendhfxf2.zig", - "compiler_rt/extendsfdf2.zig", - "compiler_rt/extendsftf2.zig", - "compiler_rt/extendsfxf2.zig", - "compiler_rt/extendxftf2.zig", - "compiler_rt/fabs.zig", - "compiler_rt/fixdfdi.zig", - "compiler_rt/fixdfsi.zig", - "compiler_rt/fixdfti.zig", - "compiler_rt/fixhfdi.zig", - "compiler_rt/fixhfsi.zig", - "compiler_rt/fixhfti.zig", - "compiler_rt/fixsfdi.zig", - "compiler_rt/fixsfsi.zig", - "compiler_rt/fixsfti.zig", - "compiler_rt/fixtfdi.zig", - "compiler_rt/fixtfsi.zig", - "compiler_rt/fixtfti.zig", - "compiler_rt/fixunsdfdi.zig", - "compiler_rt/fixunsdfsi.zig", - "compiler_rt/fixunsdfti.zig", - "compiler_rt/fixunshfdi.zig", - "compiler_rt/fixunshfsi.zig", - "compiler_rt/fixunshfti.zig", - "compiler_rt/fixunssfdi.zig", - "compiler_rt/fixunssfsi.zig", - "compiler_rt/fixunssfti.zig", - "compiler_rt/fixunstfdi.zig", - "compiler_rt/fixunstfsi.zig", - "compiler_rt/fixunstfti.zig", - "compiler_rt/fixunsxfdi.zig", - "compiler_rt/fixunsxfsi.zig", - "compiler_rt/fixunsxfti.zig", - "compiler_rt/fixxfdi.zig", - "compiler_rt/fixxfsi.zig", - "compiler_rt/fixxfti.zig", - "compiler_rt/floatdidf.zig", - "compiler_rt/floatdihf.zig", - "compiler_rt/floatdisf.zig", - "compiler_rt/floatditf.zig", - "compiler_rt/floatdixf.zig", - "compiler_rt/floatsidf.zig", - "compiler_rt/floatsihf.zig", - "compiler_rt/floatsisf.zig", - "compiler_rt/floatsitf.zig", - "compiler_rt/floatsixf.zig", - "compiler_rt/floattidf.zig", - "compiler_rt/floattihf.zig", - "compiler_rt/floattisf.zig", - "compiler_rt/floattitf.zig", - "compiler_rt/floattixf.zig", - "compiler_rt/floatundidf.zig", - "compiler_rt/floatundihf.zig", - "compiler_rt/floatundisf.zig", - "compiler_rt/floatunditf.zig", - "compiler_rt/floatundixf.zig", - "compiler_rt/floatunsidf.zig", - "compiler_rt/floatunsihf.zig", - "compiler_rt/floatunsisf.zig", - "compiler_rt/floatunsitf.zig", - "compiler_rt/floatunsixf.zig", - "compiler_rt/floatuntidf.zig", - "compiler_rt/floatuntihf.zig", - "compiler_rt/floatuntisf.zig", - "compiler_rt/floatuntitf.zig", - "compiler_rt/floatuntixf.zig", - "compiler_rt/floor.zig", - "compiler_rt/fma.zig", - "compiler_rt/fmax.zig", - "compiler_rt/fmin.zig", - "compiler_rt/fmod.zig", - "compiler_rt/gedf2.zig", - "compiler_rt/gesf2.zig", - "compiler_rt/getf2.zig", - "compiler_rt/gexf2.zig", - "compiler_rt/int.zig", - "compiler_rt/log.zig", - "compiler_rt/log10.zig", - "compiler_rt/log2.zig", - "compiler_rt/modti3.zig", - "compiler_rt/muldf3.zig", - "compiler_rt/muldi3.zig", - "compiler_rt/mulf3.zig", - "compiler_rt/mulo.zig", - "compiler_rt/mulsf3.zig", - "compiler_rt/multf3.zig", - "compiler_rt/multi3.zig", - "compiler_rt/mulxf3.zig", - "compiler_rt/negXf2.zig", - "compiler_rt/negXi2.zig", - "compiler_rt/negv.zig", - "compiler_rt/os_version_check.zig", - "compiler_rt/parity.zig", - "compiler_rt/popcount.zig", - "compiler_rt/round.zig", - "compiler_rt/shift.zig", - "compiler_rt/sin.zig", - "compiler_rt/sincos.zig", - "compiler_rt/sqrt.zig", - "compiler_rt/stack_probe.zig", - "compiler_rt/subdf3.zig", - "compiler_rt/subo.zig", - "compiler_rt/subsf3.zig", - "compiler_rt/subtf3.zig", - "compiler_rt/subxf3.zig", - "compiler_rt/tan.zig", - "compiler_rt/trunc.zig", - "compiler_rt/truncdfhf2.zig", - "compiler_rt/truncdfsf2.zig", - "compiler_rt/truncsfhf2.zig", - "compiler_rt/trunctfdf2.zig", - "compiler_rt/trunctfhf2.zig", - "compiler_rt/trunctfsf2.zig", - "compiler_rt/trunctfxf2.zig", - "compiler_rt/truncxfdf2.zig", - "compiler_rt/truncxfhf2.zig", - "compiler_rt/truncxfsf2.zig", - "compiler_rt/udivmodti4.zig", - "compiler_rt/udivti3.zig", - "compiler_rt/umodti3.zig", - "compiler_rt/unorddf2.zig", - "compiler_rt/unordsf2.zig", - "compiler_rt/unordtf2.zig", +pub const sources = &[_][]const u8{ + "absvdi2.zig", + "absvsi2.zig", + "absvti2.zig", + "adddf3.zig", + "addo.zig", + "addsf3.zig", + "addtf3.zig", + "addxf3.zig", + "arm.zig", + "atomics.zig", + "aulldiv.zig", + "aullrem.zig", + "bswap.zig", + "ceil.zig", + "clear_cache.zig", + "cmp.zig", + "cmpdf2.zig", + "cmpsf2.zig", + "cmptf2.zig", + "cmpxf2.zig", + "cos.zig", + "count0bits.zig", + "divdf3.zig", + "divsf3.zig", + "divtf3.zig", + "divti3.zig", + "divxf3.zig", + "emutls.zig", + "exp.zig", + "exp2.zig", + "extenddftf2.zig", + "extenddfxf2.zig", + "extendhfsf2.zig", + "extendhftf2.zig", + "extendhfxf2.zig", + "extendsfdf2.zig", + "extendsftf2.zig", + "extendsfxf2.zig", + "extendxftf2.zig", + "fabs.zig", + "fixdfdi.zig", + "fixdfsi.zig", + "fixdfti.zig", + "fixhfdi.zig", + "fixhfsi.zig", + "fixhfti.zig", + "fixsfdi.zig", + "fixsfsi.zig", + "fixsfti.zig", + "fixtfdi.zig", + "fixtfsi.zig", + "fixtfti.zig", + "fixunsdfdi.zig", + "fixunsdfsi.zig", + "fixunsdfti.zig", + "fixunshfdi.zig", + "fixunshfsi.zig", + "fixunshfti.zig", + "fixunssfdi.zig", + "fixunssfsi.zig", + "fixunssfti.zig", + "fixunstfdi.zig", + "fixunstfsi.zig", + "fixunstfti.zig", + "fixunsxfdi.zig", + "fixunsxfsi.zig", + "fixunsxfti.zig", + "fixxfdi.zig", + "fixxfsi.zig", + "fixxfti.zig", + "floatdidf.zig", + "floatdihf.zig", + "floatdisf.zig", + "floatditf.zig", + "floatdixf.zig", + "floatsidf.zig", + "floatsihf.zig", + "floatsisf.zig", + "floatsitf.zig", + "floatsixf.zig", + "floattidf.zig", + "floattihf.zig", + "floattisf.zig", + "floattitf.zig", + "floattixf.zig", + "floatundidf.zig", + "floatundihf.zig", + "floatundisf.zig", + "floatunditf.zig", + "floatundixf.zig", + "floatunsidf.zig", + "floatunsihf.zig", + "floatunsisf.zig", + "floatunsitf.zig", + "floatunsixf.zig", + "floatuntidf.zig", + "floatuntihf.zig", + "floatuntisf.zig", + "floatuntitf.zig", + "floatuntixf.zig", + "floor.zig", + "fma.zig", + "fmax.zig", + "fmin.zig", + "fmod.zig", + "gedf2.zig", + "gesf2.zig", + "getf2.zig", + "gexf2.zig", + "int.zig", + "log.zig", + "log10.zig", + "log2.zig", + "modti3.zig", + "muldf3.zig", + "muldi3.zig", + "mulf3.zig", + "mulo.zig", + "mulsf3.zig", + "multf3.zig", + "multi3.zig", + "mulxf3.zig", + "negXf2.zig", + "negXi2.zig", + "negv.zig", + "os_version_check.zig", + "parity.zig", + "popcount.zig", + "round.zig", + "shift.zig", + "sin.zig", + "sincos.zig", + "sqrt.zig", + "stack_probe.zig", + "subdf3.zig", + "subo.zig", + "subsf3.zig", + "subtf3.zig", + "subxf3.zig", + "tan.zig", + "trunc.zig", + "truncdfhf2.zig", + "truncdfsf2.zig", + "truncsfhf2.zig", + "trunctfdf2.zig", + "trunctfhf2.zig", + "trunctfsf2.zig", + "trunctfxf2.zig", + "truncxfdf2.zig", + "truncxfhf2.zig", + "truncxfsf2.zig", + "udivmodti4.zig", + "udivti3.zig", + "umodti3.zig", + "unorddf2.zig", + "unordsf2.zig", + "unordtf2.zig", }; -- cgit v1.2.3