diff options
| author | Andrew Kelley <andrewrk@noreply.codeberg.org> | 2025-12-22 20:09:34 +0100 |
|---|---|---|
| committer | Andrew Kelley <andrewrk@noreply.codeberg.org> | 2025-12-22 20:09:34 +0100 |
| commit | 985a3565c6130c7279319e9c36642f0b958e6944 (patch) | |
| tree | 30f4a6bed794330daefb4d3d7ef6f900e21d24b9 /src/Compilation.zig | |
| parent | 3af842f0e89125e65a87e5752234bf7e0051aa12 (diff) | |
| parent | 23e5a17187dc3a1f61dcb40b681f6730334d3667 (diff) | |
| download | zig-985a3565c6130c7279319e9c36642f0b958e6944.tar.gz zig-985a3565c6130c7279319e9c36642f0b958e6944.zip | |
Merge pull request 'Replace uses of `std.Thread.Pool` with `std.Io`, and remove `std.Thread.Pool`' (#30557) from compiler-std.Io into master
Reviewed-on: https://codeberg.org/ziglang/zig/pulls/30557
Reviewed-by: Andrew Kelley <andrewrk@noreply.codeberg.org>
Diffstat (limited to 'src/Compilation.zig')
| -rw-r--r-- | src/Compilation.zig | 887 |
1 files changed, 421 insertions, 466 deletions
diff --git a/src/Compilation.zig b/src/Compilation.zig index c40e025955..931a0b2d14 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -10,8 +10,6 @@ const Allocator = std.mem.Allocator; const assert = std.debug.assert; const log = std.log.scoped(.compilation); const Target = std.Target; -const ThreadPool = std.Thread.Pool; -const WaitGroup = std.Thread.WaitGroup; const ErrorBundle = std.zig.ErrorBundle; const fatal = std.process.fatal; @@ -56,6 +54,7 @@ gpa: Allocator, /// threads at once. arena: Allocator, io: Io, +thread_limit: usize, /// Not every Compilation compiles .zig code! For example you could do `zig build-exe foo.o`. zcu: ?*Zcu, /// Contains different state depending on the `CacheMode` used by this `Compilation`. @@ -110,7 +109,14 @@ win32_resource_table: if (dev.env.supports(.win32_resource)) std.AutoArrayHashMa } = .{}, link_diags: link.Diags, -link_task_queue: link.Queue = .empty, +link_queue: link.Queue = .empty, + +/// This is populated during `Compilation.create` with a set of prelink tasks which need to be +/// queued on the first update. In `update`, we will send these tasks to the linker, and clear +/// them from this list. +/// +/// Allocated into `gpa`. +oneshot_prelink_tasks: std.ArrayList(link.PrelinkTask), /// Set of work that can be represented by only flags to determine whether the /// work is queued or not. @@ -198,7 +204,6 @@ libc_include_dir_list: []const []const u8, libc_framework_dir_list: []const []const u8, rc_includes: std.zig.RcIncludes, mingw_unicode_entry_point: bool, -thread_pool: *ThreadPool, /// Populated when we build the libc++ static library. A Job to build this is placed in the queue /// and resolved before calling linker.flush(). @@ -248,16 +253,10 @@ crt_files: std.StringHashMapUnmanaged(CrtFile) = .empty, reference_trace: ?u32 = null, /// This mutex guards all `Compilation` mutable state. -/// Disabled in single-threaded mode because the thread pool spawns in the same thread. -mutex: if (builtin.single_threaded) struct { - pub inline fn tryLock(_: @This()) void {} - pub inline fn lock(_: @This()) void {} - pub inline fn unlock(_: @This()) void {} -} else std.Thread.Mutex = .{}, +mutex: std.Io.Mutex = .init, test_filters: []const []const u8, -link_task_wait_group: WaitGroup = .{}, link_prog_node: std.Progress.Node = .none, llvm_opt_bisect_limit: c_int, @@ -1568,7 +1567,7 @@ pub const CacheMode = enum { pub const ParentWholeCache = struct { manifest: *Cache.Manifest, - mutex: *std.Thread.Mutex, + mutex: *std.Io.Mutex, prefix_map: [4]u8, }; @@ -1596,7 +1595,7 @@ const CacheUse = union(CacheMode) { lf_open_opts: link.File.OpenOptions, /// This is a pointer to a local variable inside `update`. cache_manifest: ?*Cache.Manifest, - cache_manifest_mutex: std.Thread.Mutex, + cache_manifest_mutex: std.Io.Mutex, /// This is non-`null` for most of the body of `update`. It is the temporary directory which /// we initially emit our artifacts to. After the main part of the update is done, it will /// be closed and moved to its final location, and this field set to `null`. @@ -1636,7 +1635,7 @@ const CacheUse = union(CacheMode) { pub const CreateOptions = struct { dirs: Directories, - thread_pool: *ThreadPool, + thread_limit: usize, self_exe_path: ?[]const u8 = null, /// Options that have been resolved by calling `resolveDefaults`. @@ -2211,8 +2210,9 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, .llvm_object = null, .analysis_roots_buffer = undefined, .analysis_roots_len = 0, + .codegen_task_pool = try .init(arena), }; - try zcu.init(options.thread_pool.getIdCount()); + try zcu.init(gpa, io, options.thread_limit); break :blk zcu; } else blk: { if (options.emit_h != .no) return diag.fail(.emit_h_without_zcu); @@ -2224,6 +2224,7 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, .gpa = gpa, .arena = arena, .io = io, + .thread_limit = options.thread_limit, .zcu = opt_zcu, .cache_use = undefined, // populated below .bin_file = null, // populated below if necessary @@ -2241,7 +2242,6 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, .libc_framework_dir_list = libc_dirs.libc_framework_dir_list, .rc_includes = options.rc_includes, .mingw_unicode_entry_point = options.mingw_unicode_entry_point, - .thread_pool = options.thread_pool, .clang_passthrough_mode = options.clang_passthrough_mode, .clang_preprocessor_mode = options.clang_preprocessor_mode, .verbose_cc = options.verbose_cc, @@ -2282,7 +2282,8 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, .global_cc_argv = options.global_cc_argv, .file_system_inputs = options.file_system_inputs, .parent_whole_cache = options.parent_whole_cache, - .link_diags = .init(gpa), + .link_diags = .init(gpa, io), + .oneshot_prelink_tasks = .empty, .emit_bin = try options.emit_bin.resolve(arena, &options, .bin), .emit_asm = try options.emit_asm.resolve(arena, &options, .@"asm"), .emit_implib = try options.emit_implib.resolve(arena, &options, .implib), @@ -2468,7 +2469,7 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, whole.* = .{ .lf_open_opts = lf_open_opts, .cache_manifest = null, - .cache_manifest_mutex = .{}, + .cache_manifest_mutex = .init, .tmp_artifact_directory = null, .lock = null, }; @@ -2553,14 +2554,14 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, }; const fields = @typeInfo(@TypeOf(paths)).@"struct".fields; - try comp.link_task_queue.queued_prelink.ensureUnusedCapacity(gpa, fields.len + 1); + try comp.oneshot_prelink_tasks.ensureUnusedCapacity(gpa, fields.len + 1); inline for (fields) |field| { if (@field(paths, field.name)) |path| { - comp.link_task_queue.queued_prelink.appendAssumeCapacity(.{ .load_object = path }); + comp.oneshot_prelink_tasks.appendAssumeCapacity(.{ .load_object = path }); } } // Loads the libraries provided by `target_util.libcFullLinkFlags(target)`. - comp.link_task_queue.queued_prelink.appendAssumeCapacity(.load_host_libc); + comp.oneshot_prelink_tasks.appendAssumeCapacity(.load_host_libc); } else if (target.isMuslLibC()) { if (!std.zig.target.canBuildLibC(target)) return diag.fail(.cross_libc_unavailable); @@ -2629,10 +2630,9 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, for (0..count) |i| { try comp.queueJob(.{ .windows_import_lib = i }); } - // when integrating coff linker with prelink, the above - // queueJob will need to change into something else since those - // jobs are dispatched *after* the link_task_wait_group.wait() - // that happens when separateCodegenThreadOk() is false. + // when integrating coff linker with prelink, the above `queueJob` will need to move + // to something in `dispatchPrelinkWork`, which must queue all prelink link tasks + // *before* we begin working on the main job queue. } if (comp.wantBuildLibUnwindFromSource()) { comp.queued_jobs.libunwind = true; @@ -2681,19 +2681,15 @@ pub fn create(gpa: Allocator, arena: Allocator, io: Io, diag: *CreateDiagnostic, } } - try comp.link_task_queue.queued_prelink.append(gpa, .load_explicitly_provided); + try comp.oneshot_prelink_tasks.append(gpa, .load_explicitly_provided); } - log.debug("queued prelink tasks: {d}", .{comp.link_task_queue.queued_prelink.items.len}); + log.debug("queued oneshot prelink tasks: {d}", .{comp.oneshot_prelink_tasks.items.len}); return comp; } pub fn destroy(comp: *Compilation) void { const gpa = comp.gpa; - // This needs to be destroyed first, because it might contain MIR which we only know - // how to interpret (which kind of MIR it is) from `comp.bin_file`. - comp.link_task_queue.deinit(comp); - if (comp.bin_file) |lf| lf.destroy(); if (comp.zcu) |zcu| zcu.deinit(); comp.cache_use.deinit(); @@ -2760,6 +2756,7 @@ pub fn destroy(comp: *Compilation) void { if (comp.time_report) |*tr| tr.deinit(gpa); comp.link_diags.deinit(); + comp.oneshot_prelink_tasks.deinit(gpa); comp.clearMiscFailures(); @@ -2865,8 +2862,10 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE const tracy_trace = trace(@src()); defer tracy_trace.end(); - // This arena is scoped to this one update. const gpa = comp.gpa; + const io = comp.io; + + // This arena is scoped to this one update. var arena_allocator = std.heap.ArenaAllocator.init(gpa); defer arena_allocator.deinit(); const arena = arena_allocator.allocator(); @@ -2946,8 +2945,8 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE // In this case the cache hit contains the full set of file system inputs. Nice! if (comp.file_system_inputs) |buf| try man.populateFileSystemInputs(buf); if (comp.parent_whole_cache) |pwc| { - pwc.mutex.lock(); - defer pwc.mutex.unlock(); + try pwc.mutex.lock(io); + defer pwc.mutex.unlock(io); try man.populateOtherManifest(pwc.manifest, pwc.prefix_map); } @@ -3066,7 +3065,7 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE comp.link_prog_node = .none; }; - try comp.performAllTheWork(main_progress_node); + try comp.performAllTheWork(main_progress_node, arena); if (comp.zcu) |zcu| { const pt: Zcu.PerThread = .activate(zcu, .main); @@ -3132,8 +3131,8 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE .whole => |whole| { if (comp.file_system_inputs) |buf| try man.populateFileSystemInputs(buf); if (comp.parent_whole_cache) |pwc| { - pwc.mutex.lock(); - defer pwc.mutex.unlock(); + try pwc.mutex.lock(io); + defer pwc.mutex.unlock(io); try man.populateOtherManifest(pwc.manifest, pwc.prefix_map); } @@ -3234,6 +3233,7 @@ pub fn update(comp: *Compilation, main_progress_node: std.Progress.Node) UpdateE /// Thread-safe. Assumes that `comp.mutex` is *not* already held by the caller. pub fn appendFileSystemInput(comp: *Compilation, path: Compilation.Path) Allocator.Error!void { const gpa = comp.gpa; + const io = comp.io; const fsi = comp.file_system_inputs orelse return; const prefixes = comp.cache_parent.prefixes(); @@ -3253,8 +3253,8 @@ pub fn appendFileSystemInput(comp: *Compilation, path: Compilation.Path) Allocat ); // There may be concurrent calls to this function from C object workers and/or the main thread. - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try fsi.ensureUnusedCapacity(gpa, path.sub_path.len + 3); if (fsi.items.len > 0) fsi.appendAssumeCapacity(0); @@ -3305,6 +3305,7 @@ fn flush( arena: Allocator, tid: Zcu.PerThread.Id, ) Allocator.Error!void { + const io = comp.io; if (comp.zcu) |zcu| { if (zcu.llvm_object) |llvm_object| { const pt: Zcu.PerThread = .activate(zcu, tid); @@ -3317,8 +3318,8 @@ fn flush( var timer = comp.startTimer(); defer if (timer.finish()) |ns| { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.real_ns_llvm_emit = ns; }; @@ -3362,8 +3363,8 @@ fn flush( if (comp.bin_file) |lf| { var timer = comp.startTimer(); defer if (timer.finish()) |ns| { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.real_ns_link_flush = ns; }; // This is needed before reading the error flags. @@ -4575,44 +4576,277 @@ pub fn unableToLoadZcuFile( fn performAllTheWork( comp: *Compilation, main_progress_node: std.Progress.Node, + update_arena: Allocator, ) JobError!void { - // Regardless of errors, `comp.zcu` needs to update its generation number. defer if (comp.zcu) |zcu| { + zcu.codegen_task_pool.cancel(zcu); + // Regardless of errors, `comp.zcu` needs to update its generation number. zcu.generation += 1; }; + const io = comp.io; + // This is awkward: we don't want to start the timer until later, but we won't want to stop it // until the wait groups finish. That means we need do do this. var decl_work_timer: ?Timer = null; defer commit_timer: { const t = &(decl_work_timer orelse break :commit_timer); const ns = t.finish() orelse break :commit_timer; - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.time_report.?.stats.real_ns_decls = ns; } - // Here we queue up all the AstGen tasks first, followed by C object compilation. - // We wait until the AstGen tasks are all completed before proceeding to the - // (at least for now) single-threaded main work queue. However, C object compilation - // only needs to be finished by the end of this function. - - var work_queue_wait_group: WaitGroup = .{}; - defer work_queue_wait_group.wait(); + var misc_group: Io.Group = .init; + defer misc_group.cancel(io); - comp.link_task_wait_group.reset(); - defer comp.link_task_wait_group.wait(); + try comp.link_queue.start(comp, update_arena); + defer comp.link_queue.cancel(io); - // Already-queued prelink tasks - comp.link_prog_node.increaseEstimatedTotalItems(comp.link_task_queue.queued_prelink.items.len); - comp.link_task_queue.start(comp); + misc_group.concurrent(io, dispatchPrelinkWork, .{ comp, main_progress_node }) catch |err| switch (err) { + error.ConcurrencyUnavailable => { + // Do it immediately so that the link queue isn't blocked + dispatchPrelinkWork(comp, main_progress_node); + }, + }; if (comp.emit_docs != null) { dev.check(.docs_emit); - comp.thread_pool.spawnWg(&work_queue_wait_group, workerDocsCopy, .{comp}); - work_queue_wait_group.spawnManager(workerDocsWasm, .{ comp, main_progress_node }); + misc_group.async(io, workerDocsCopy, .{comp}); + misc_group.async(io, workerDocsWasm, .{ comp, main_progress_node }); + } + + if (comp.zcu) |zcu| { + const astgen_frame = tracy.namedFrame("astgen"); + defer astgen_frame.end(); + + const zir_prog_node = main_progress_node.start("AST Lowering", 0); + defer zir_prog_node.end(); + + var timer = comp.startTimer(); + defer if (timer.finish()) |ns| { + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); + comp.time_report.?.stats.real_ns_files = ns; + }; + + const gpa = comp.gpa; + + var astgen_group: Io.Group = .init; + defer astgen_group.cancel(io); + + // We cannot reference `zcu.import_table` after we spawn any `workerUpdateFile` jobs, + // because on single-threaded targets the worker will be run eagerly, meaning the + // `import_table` could be mutated, and not even holding `comp.mutex` will save us. So, + // build up a list of the files to update *before* we spawn any jobs. + var astgen_work_items: std.MultiArrayList(struct { + file_index: Zcu.File.Index, + file: *Zcu.File, + }) = .empty; + defer astgen_work_items.deinit(gpa); + // Not every item in `import_table` will need updating, because some are builtin.zig + // files. However, most will, so let's just reserve sufficient capacity upfront. + try astgen_work_items.ensureTotalCapacity(gpa, zcu.import_table.count()); + for (zcu.import_table.keys()) |file_index| { + const file = zcu.fileByIndex(file_index); + if (file.is_builtin) { + // This is a `builtin.zig`, so updating is redundant. However, we want to make + // sure the file contents are still correct on disk, since it can improve the + // debugging experience better. That job only needs `file`, so we can kick it + // off right now. + astgen_group.async(io, workerUpdateBuiltinFile, .{ comp, file }); + continue; + } + astgen_work_items.appendAssumeCapacity(.{ + .file_index = file_index, + .file = file, + }); + } + + // Now that we're not going to touch `zcu.import_table` again, we can spawn `workerUpdateFile` jobs. + for (astgen_work_items.items(.file_index), astgen_work_items.items(.file)) |file_index, file| { + astgen_group.async(io, workerUpdateFile, .{ + comp, file, file_index, zir_prog_node, &astgen_group, + }); + } + + // On the other hand, it's fine to directly iterate `zcu.embed_table.keys()` here + // because `workerUpdateEmbedFile` can't invalidate it. The different here is that one + // `@embedFile` can't trigger analysis of a new `@embedFile`! + for (0.., zcu.embed_table.keys()) |ef_index_usize, ef| { + const ef_index: Zcu.EmbedFile.Index = @enumFromInt(ef_index_usize); + astgen_group.async(io, workerUpdateEmbedFile, .{ + comp, ef_index, ef, + }); + } + + astgen_group.wait(io); + } + + if (comp.zcu) |zcu| { + const pt: Zcu.PerThread = .activate(zcu, .main); + defer pt.deactivate(); + + const gpa = zcu.gpa; + + // On an incremental update, a source file might become "dead", in that all imports of + // the file were removed. This could even change what module the file belongs to! As such, + // we do a traversal over the files, to figure out which ones are alive and the modules + // they belong to. + const any_fatal_files = try pt.computeAliveFiles(); + + // If the cache mode is `whole`, add every alive source file to the manifest. + switch (comp.cache_use) { + .whole => |whole| if (whole.cache_manifest) |man| { + for (zcu.alive_files.keys()) |file_index| { + const file = zcu.fileByIndex(file_index); + + switch (file.status) { + .never_loaded => unreachable, // AstGen tried to load it + .retryable_failure => continue, // the file cannot be read; this is a guaranteed error + .astgen_failure, .success => {}, // the file was read successfully + } + + const path = try file.path.toAbsolute(comp.dirs, gpa); + defer gpa.free(path); + + const result = res: { + try whole.cache_manifest_mutex.lock(io); + defer whole.cache_manifest_mutex.unlock(io); + if (file.source) |source| { + break :res man.addFilePostContents(path, source, file.stat); + } else { + break :res man.addFilePost(path); + } + }; + result catch |err| switch (err) { + error.OutOfMemory => |e| return e, + else => { + try pt.reportRetryableFileError(file_index, "unable to update cache: {s}", .{@errorName(err)}); + continue; + }, + }; + } + }, + .none, .incremental => {}, + } + + if (any_fatal_files or + zcu.multi_module_err != null or + zcu.failed_imports.items.len > 0 or + comp.alloc_failure_occurred) + { + // We give up right now! No updating of ZIR refs, no nothing. The idea is that this prevents + // us from invalidating lots of incremental dependencies due to files with e.g. parse errors. + // However, this means our analysis data is invalid, so we want to omit all analysis errors. + zcu.skip_analysis_this_update = true; + // Since we're skipping analysis, there are no ZCU link tasks. + comp.link_queue.finishZcuQueue(comp); + // Let other compilation work finish to collect as many errors as possible. + misc_group.wait(io); + comp.link_queue.wait(io); + return; + } + + if (comp.time_report) |*tr| { + tr.stats.n_reachable_files = @intCast(zcu.alive_files.count()); + } + + if (comp.config.incremental) { + const update_zir_refs_node = main_progress_node.start("Update ZIR References", 0); + defer update_zir_refs_node.end(); + try pt.updateZirRefs(); + } + try zcu.flushRetryableFailures(); + + // It's analysis time! Queue up our initial analysis. + for (zcu.analysisRoots()) |mod| { + try comp.queueJob(.{ .analyze_mod = mod }); + } + + zcu.sema_prog_node = main_progress_node.start("Semantic Analysis", 0); + if (comp.bin_file != null) { + zcu.codegen_prog_node = main_progress_node.start("Code Generation", 0); + } + // We increment `pending_codegen_jobs` so that it doesn't reach 0 until after analysis finishes. + // That prevents the "Code Generation" node from constantly disappearing and reappearing when + // we're probably going to analyze more functions at some point. + assert(zcu.pending_codegen_jobs.swap(1, .monotonic) == 0); // don't let this become 0 until analysis finishes + } + // When analysis ends, delete the progress nodes for "Semantic Analysis" and possibly "Code Generation". + defer if (comp.zcu) |zcu| { + zcu.sema_prog_node.end(); + zcu.sema_prog_node = .none; + if (zcu.pending_codegen_jobs.fetchSub(1, .monotonic) == 1) { + // Decremented to 0, so all done. + zcu.codegen_prog_node.end(); + zcu.codegen_prog_node = .none; + } + }; + + if (comp.zcu) |zcu| { + if (!zcu.backendSupportsFeature(.separate_thread)) { + // Close the ZCU task queue. Prelink may still be running, but the closed + // queue will cause the linker task to exit once prelink finishes. The + // closed queue also communicates to `enqueueZcu` that it should wait for + // the linker task to finish and then run ZCU tasks serially. + comp.link_queue.finishZcuQueue(comp); + } + } + + if (comp.zcu != null) { + // Start the timer for the "decls" part of the pipeline (Sema, CodeGen, link). + decl_work_timer = comp.startTimer(); } + work: while (true) { + for (&comp.work_queues) |*work_queue| if (work_queue.popFront()) |job| { + try processOneJob( + @intFromEnum(Zcu.PerThread.Id.main), + comp, + job, + ); + continue :work; + }; + if (comp.zcu) |zcu| { + // If there's no work queued, check if there's anything outdated + // which we need to work on, and queue it if so. + if (try zcu.findOutdatedToAnalyze()) |outdated| { + try comp.queueJob(switch (outdated.unwrap()) { + .func => |f| .{ .analyze_func = f }, + .memoized_state, + .@"comptime", + .nav_ty, + .nav_val, + .type, + => .{ .analyze_comptime_unit = outdated }, + }); + continue; + } + zcu.sema_prog_node.end(); + zcu.sema_prog_node = .none; + } + break; + } + + comp.link_queue.finishZcuQueue(comp); + + // Main thread work is all done, now just wait for all async work. + misc_group.wait(io); + comp.link_queue.wait(io); +} + +fn dispatchPrelinkWork(comp: *Compilation, main_progress_node: std.Progress.Node) void { + const io = comp.io; + + var prelink_group: Io.Group = .init; + defer prelink_group.cancel(io); + + comp.queuePrelinkTasks(comp.oneshot_prelink_tasks.items) catch |err| switch (err) { + error.Canceled => return, + }; + comp.oneshot_prelink_tasks.clearRetainingCapacity(); + // In case it failed last time, try again. `clearMiscFailures` was already // called at the start of `update`. if (comp.queued_jobs.compiler_rt_lib and comp.compiler_rt_lib == null) { @@ -4620,8 +4854,7 @@ fn performAllTheWork( // compiler-rt due to LLD bugs as well, e.g.: // // https://github.com/llvm/llvm-project/issues/43698#issuecomment-2542660611 - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + prelink_group.async(io, buildRt, .{ comp, "compiler_rt.zig", "compiler_rt", @@ -4638,8 +4871,7 @@ fn performAllTheWork( } if (comp.queued_jobs.compiler_rt_obj and comp.compiler_rt_obj == null) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + prelink_group.async(io, buildRt, .{ comp, "compiler_rt.zig", "compiler_rt", @@ -4657,8 +4889,7 @@ fn performAllTheWork( // hack for stage2_x86_64 + coff if (comp.queued_jobs.compiler_rt_dyn_lib and comp.compiler_rt_dyn_lib == null) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + prelink_group.async(io, buildRt, .{ comp, "compiler_rt.zig", "compiler_rt", @@ -4675,8 +4906,7 @@ fn performAllTheWork( } if (comp.queued_jobs.fuzzer_lib and comp.fuzzer_lib == null) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + prelink_group.async(io, buildRt, .{ comp, "fuzzer.zig", "fuzzer", @@ -4690,8 +4920,7 @@ fn performAllTheWork( } if (comp.queued_jobs.ubsan_rt_lib and comp.ubsan_rt_lib == null) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + prelink_group.async(io, buildRt, .{ comp, "ubsan_rt.zig", "ubsan_rt", @@ -4707,8 +4936,7 @@ fn performAllTheWork( } if (comp.queued_jobs.ubsan_rt_obj and comp.ubsan_rt_obj == null) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildRt, .{ + prelink_group.async(io, buildRt, .{ comp, "ubsan_rt.zig", "ubsan_rt", @@ -4724,310 +4952,93 @@ fn performAllTheWork( } if (comp.queued_jobs.glibc_shared_objects) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildGlibcSharedObjects, .{ comp, main_progress_node }); + prelink_group.async(io, buildGlibcSharedObjects, .{ comp, main_progress_node }); } if (comp.queued_jobs.freebsd_shared_objects) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildFreeBSDSharedObjects, .{ comp, main_progress_node }); + prelink_group.async(io, buildFreeBSDSharedObjects, .{ comp, main_progress_node }); } if (comp.queued_jobs.netbsd_shared_objects) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildNetBSDSharedObjects, .{ comp, main_progress_node }); + prelink_group.async(io, buildNetBSDSharedObjects, .{ comp, main_progress_node }); } if (comp.queued_jobs.libunwind) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibUnwind, .{ comp, main_progress_node }); + prelink_group.async(io, buildLibUnwind, .{ comp, main_progress_node }); } if (comp.queued_jobs.libcxx) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibCxx, .{ comp, main_progress_node }); + prelink_group.async(io, buildLibCxx, .{ comp, main_progress_node }); } if (comp.queued_jobs.libcxxabi) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibCxxAbi, .{ comp, main_progress_node }); + prelink_group.async(io, buildLibCxxAbi, .{ comp, main_progress_node }); } if (comp.queued_jobs.libtsan) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibTsan, .{ comp, main_progress_node }); + prelink_group.async(io, buildLibTsan, .{ comp, main_progress_node }); } if (comp.queued_jobs.zigc_lib and comp.zigc_static_lib == null) { - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildLibZigC, .{ comp, main_progress_node }); + prelink_group.async(io, buildLibZigC, .{ comp, main_progress_node }); } for (0..@typeInfo(musl.CrtFile).@"enum".fields.len) |i| { if (comp.queued_jobs.musl_crt_file[i]) { const tag: musl.CrtFile = @enumFromInt(i); - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildMuslCrtFile, .{ comp, tag, main_progress_node }); + prelink_group.async(io, buildMuslCrtFile, .{ comp, tag, main_progress_node }); } } for (0..@typeInfo(glibc.CrtFile).@"enum".fields.len) |i| { if (comp.queued_jobs.glibc_crt_file[i]) { const tag: glibc.CrtFile = @enumFromInt(i); - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildGlibcCrtFile, .{ comp, tag, main_progress_node }); + prelink_group.async(io, buildGlibcCrtFile, .{ comp, tag, main_progress_node }); } } for (0..@typeInfo(freebsd.CrtFile).@"enum".fields.len) |i| { if (comp.queued_jobs.freebsd_crt_file[i]) { const tag: freebsd.CrtFile = @enumFromInt(i); - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildFreeBSDCrtFile, .{ comp, tag, main_progress_node }); + prelink_group.async(io, buildFreeBSDCrtFile, .{ comp, tag, main_progress_node }); } } for (0..@typeInfo(netbsd.CrtFile).@"enum".fields.len) |i| { if (comp.queued_jobs.netbsd_crt_file[i]) { const tag: netbsd.CrtFile = @enumFromInt(i); - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildNetBSDCrtFile, .{ comp, tag, main_progress_node }); + prelink_group.async(io, buildNetBSDCrtFile, .{ comp, tag, main_progress_node }); } } for (0..@typeInfo(wasi_libc.CrtFile).@"enum".fields.len) |i| { if (comp.queued_jobs.wasi_libc_crt_file[i]) { const tag: wasi_libc.CrtFile = @enumFromInt(i); - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildWasiLibcCrtFile, .{ comp, tag, main_progress_node }); + prelink_group.async(io, buildWasiLibcCrtFile, .{ comp, tag, main_progress_node }); } } for (0..@typeInfo(mingw.CrtFile).@"enum".fields.len) |i| { if (comp.queued_jobs.mingw_crt_file[i]) { const tag: mingw.CrtFile = @enumFromInt(i); - comp.link_task_queue.startPrelinkItem(); - comp.link_task_wait_group.spawnManager(buildMingwCrtFile, .{ comp, tag, main_progress_node }); - } - } - - { - const astgen_frame = tracy.namedFrame("astgen"); - defer astgen_frame.end(); - - const zir_prog_node = main_progress_node.start("AST Lowering", 0); - defer zir_prog_node.end(); - - var timer = comp.startTimer(); - defer if (timer.finish()) |ns| { - comp.mutex.lock(); - defer comp.mutex.unlock(); - comp.time_report.?.stats.real_ns_files = ns; - }; - - var astgen_wait_group: WaitGroup = .{}; - defer astgen_wait_group.wait(); - - if (comp.zcu) |zcu| { - const gpa = zcu.gpa; - - // We cannot reference `zcu.import_table` after we spawn any `workerUpdateFile` jobs, - // because on single-threaded targets the worker will be run eagerly, meaning the - // `import_table` could be mutated, and not even holding `comp.mutex` will save us. So, - // build up a list of the files to update *before* we spawn any jobs. - var astgen_work_items: std.MultiArrayList(struct { - file_index: Zcu.File.Index, - file: *Zcu.File, - }) = .empty; - defer astgen_work_items.deinit(gpa); - // Not every item in `import_table` will need updating, because some are builtin.zig - // files. However, most will, so let's just reserve sufficient capacity upfront. - try astgen_work_items.ensureTotalCapacity(gpa, zcu.import_table.count()); - for (zcu.import_table.keys()) |file_index| { - const file = zcu.fileByIndex(file_index); - if (file.is_builtin) { - // This is a `builtin.zig`, so updating is redundant. However, we want to make - // sure the file contents are still correct on disk, since it can improve the - // debugging experience better. That job only needs `file`, so we can kick it - // off right now. - comp.thread_pool.spawnWg(&astgen_wait_group, workerUpdateBuiltinFile, .{ comp, file }); - continue; - } - astgen_work_items.appendAssumeCapacity(.{ - .file_index = file_index, - .file = file, - }); - } - - // Now that we're not going to touch `zcu.import_table` again, we can spawn `workerUpdateFile` jobs. - for (astgen_work_items.items(.file_index), astgen_work_items.items(.file)) |file_index, file| { - comp.thread_pool.spawnWgId(&astgen_wait_group, workerUpdateFile, .{ - comp, file, file_index, zir_prog_node, &astgen_wait_group, - }); - } - - // On the other hand, it's fine to directly iterate `zcu.embed_table.keys()` here - // because `workerUpdateEmbedFile` can't invalidate it. The different here is that one - // `@embedFile` can't trigger analysis of a new `@embedFile`! - for (0.., zcu.embed_table.keys()) |ef_index_usize, ef| { - const ef_index: Zcu.EmbedFile.Index = @enumFromInt(ef_index_usize); - comp.thread_pool.spawnWgId(&astgen_wait_group, workerUpdateEmbedFile, .{ - comp, ef_index, ef, - }); - } - } - - while (comp.c_object_work_queue.popFront()) |c_object| { - comp.link_task_queue.startPrelinkItem(); - comp.thread_pool.spawnWg(&comp.link_task_wait_group, workerUpdateCObject, .{ - comp, c_object, main_progress_node, - }); - } - - while (comp.win32_resource_work_queue.popFront()) |win32_resource| { - comp.link_task_queue.startPrelinkItem(); - comp.thread_pool.spawnWg(&comp.link_task_wait_group, workerUpdateWin32Resource, .{ - comp, win32_resource, main_progress_node, - }); - } - } - - if (comp.zcu) |zcu| { - const pt: Zcu.PerThread = .activate(zcu, .main); - defer pt.deactivate(); - - const gpa = zcu.gpa; - - // On an incremental update, a source file might become "dead", in that all imports of - // the file were removed. This could even change what module the file belongs to! As such, - // we do a traversal over the files, to figure out which ones are alive and the modules - // they belong to. - const any_fatal_files = try pt.computeAliveFiles(); - - // If the cache mode is `whole`, add every alive source file to the manifest. - switch (comp.cache_use) { - .whole => |whole| if (whole.cache_manifest) |man| { - for (zcu.alive_files.keys()) |file_index| { - const file = zcu.fileByIndex(file_index); - - switch (file.status) { - .never_loaded => unreachable, // AstGen tried to load it - .retryable_failure => continue, // the file cannot be read; this is a guaranteed error - .astgen_failure, .success => {}, // the file was read successfully - } - - const path = try file.path.toAbsolute(comp.dirs, gpa); - defer gpa.free(path); - - const result = res: { - whole.cache_manifest_mutex.lock(); - defer whole.cache_manifest_mutex.unlock(); - if (file.source) |source| { - break :res man.addFilePostContents(path, source, file.stat); - } else { - break :res man.addFilePost(path); - } - }; - result catch |err| switch (err) { - error.OutOfMemory => |e| return e, - else => { - try pt.reportRetryableFileError(file_index, "unable to update cache: {s}", .{@errorName(err)}); - continue; - }, - }; - } - }, - .none, .incremental => {}, - } - - if (any_fatal_files or - zcu.multi_module_err != null or - zcu.failed_imports.items.len > 0 or - comp.alloc_failure_occurred) - { - // We give up right now! No updating of ZIR refs, no nothing. The idea is that this prevents - // us from invalidating lots of incremental dependencies due to files with e.g. parse errors. - // However, this means our analysis data is invalid, so we want to omit all analysis errors. - zcu.skip_analysis_this_update = true; - return; - } - - if (comp.time_report) |*tr| { - tr.stats.n_reachable_files = @intCast(zcu.alive_files.count()); - } - - if (comp.config.incremental) { - const update_zir_refs_node = main_progress_node.start("Update ZIR References", 0); - defer update_zir_refs_node.end(); - try pt.updateZirRefs(); - } - try zcu.flushRetryableFailures(); - - // It's analysis time! Queue up our initial analysis. - for (zcu.analysisRoots()) |mod| { - try comp.queueJob(.{ .analyze_mod = mod }); - } - - zcu.sema_prog_node = main_progress_node.start("Semantic Analysis", 0); - if (comp.bin_file != null) { - zcu.codegen_prog_node = main_progress_node.start("Code Generation", 0); + prelink_group.async(io, buildMingwCrtFile, .{ comp, tag, main_progress_node }); } - // We increment `pending_codegen_jobs` so that it doesn't reach 0 until after analysis finishes. - // That prevents the "Code Generation" node from constantly disappearing and reappearing when - // we're probably going to analyze more functions at some point. - assert(zcu.pending_codegen_jobs.swap(1, .monotonic) == 0); // don't let this become 0 until analysis finishes } - // When analysis ends, delete the progress nodes for "Semantic Analysis" and possibly "Code Generation". - defer if (comp.zcu) |zcu| { - zcu.sema_prog_node.end(); - zcu.sema_prog_node = .none; - if (zcu.pending_codegen_jobs.rmw(.Sub, 1, .monotonic) == 1) { - // Decremented to 0, so all done. - zcu.codegen_prog_node.end(); - zcu.codegen_prog_node = .none; - } - }; - - // We aren't going to queue any more prelink tasks. - comp.link_task_queue.finishPrelinkItem(comp); - if (!comp.separateCodegenThreadOk()) { - // Waits until all input files have been parsed. - comp.link_task_wait_group.wait(); - comp.link_task_wait_group.reset(); - std.log.scoped(.link).debug("finished waiting for link_task_wait_group", .{}); + while (comp.c_object_work_queue.popFront()) |c_object| { + prelink_group.async(io, workerUpdateCObject, .{ + comp, c_object, main_progress_node, + }); } - if (comp.zcu != null) { - // Start the timer for the "decls" part of the pipeline (Sema, CodeGen, link). - decl_work_timer = comp.startTimer(); + while (comp.win32_resource_work_queue.popFront()) |win32_resource| { + prelink_group.async(io, workerUpdateWin32Resource, .{ + comp, win32_resource, main_progress_node, + }); } - work: while (true) { - for (&comp.work_queues) |*work_queue| if (work_queue.popFront()) |job| { - try processOneJob(@intFromEnum(Zcu.PerThread.Id.main), comp, job); - continue :work; - }; - if (comp.zcu) |zcu| { - // If there's no work queued, check if there's anything outdated - // which we need to work on, and queue it if so. - if (try zcu.findOutdatedToAnalyze()) |outdated| { - try comp.queueJob(switch (outdated.unwrap()) { - .func => |f| .{ .analyze_func = f }, - .memoized_state, - .@"comptime", - .nav_ty, - .nav_val, - .type, - => .{ .analyze_comptime_unit = outdated }, - }); - continue; - } - zcu.sema_prog_node.end(); - zcu.sema_prog_node = .none; - } - break; - } + prelink_group.wait(io); + comp.link_queue.finishPrelinkQueue(comp); } const JobError = Allocator.Error || Io.Cancelable; @@ -5040,58 +5051,38 @@ pub fn queueJobs(comp: *Compilation, jobs: []const Job) !void { for (jobs) |job| try comp.queueJob(job); } -fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { +fn processOneJob( + tid: usize, + comp: *Compilation, + job: Job, +) JobError!void { switch (job) { .codegen_func => |func| { const zcu = comp.zcu.?; const gpa = zcu.gpa; - var air = func.air; - errdefer { - zcu.codegen_prog_node.completeOne(); - comp.link_prog_node.completeOne(); - air.deinit(gpa); - } - if (!air.typesFullyResolved(zcu)) { + var owned_air: ?Air = func.air; + defer if (owned_air) |*air| air.deinit(gpa); + + if (!owned_air.?.typesFullyResolved(zcu)) { // Type resolution failed in a way which affects this function. This is a transitive // failure, but it doesn't need recording, because this function semantically depends // on the failed type, so when it is changed the function is updated. zcu.codegen_prog_node.completeOne(); comp.link_prog_node.completeOne(); - air.deinit(gpa); return; } - const shared_mir = try gpa.create(link.ZcuTask.LinkFunc.SharedMir); - shared_mir.* = .{ - .status = .init(.pending), - .value = undefined, - }; - assert(zcu.pending_codegen_jobs.rmw(.Add, 1, .monotonic) > 0); // the "Code Generation" node hasn't been ended - // This value is used as a heuristic to avoid queueing too much AIR/MIR at once (hence - // using a lot of memory). If this would cause too many AIR bytes to be in-flight, we - // will block on the `dispatchZcuLinkTask` call below. - const air_bytes: u32 = @intCast(air.instructions.len * 5 + air.extra.items.len * 4); - if (comp.separateCodegenThreadOk()) { - // `workerZcuCodegen` takes ownership of `air`. - comp.thread_pool.spawnWgId(&comp.link_task_wait_group, workerZcuCodegen, .{ comp, func.func, air, shared_mir }); - comp.dispatchZcuLinkTask(tid, .{ .link_func = .{ - .func = func.func, - .mir = shared_mir, - .air_bytes = air_bytes, - } }); - } else { - { - const pt: Zcu.PerThread = .activate(comp.zcu.?, @enumFromInt(tid)); - defer pt.deactivate(); - pt.runCodegen(func.func, &air, shared_mir); - } - assert(shared_mir.status.load(.monotonic) != .pending); - comp.dispatchZcuLinkTask(tid, .{ .link_func = .{ - .func = func.func, - .mir = shared_mir, - .air_bytes = air_bytes, - } }); - air.deinit(gpa); - } + + // Some linkers need to refer to the AIR. In that case, the linker is not running + // concurrently, so we'll just keep ownership of the AIR for ourselves instead of + // letting the codegen job destroy it. + const disown_air = zcu.backendSupportsFeature(.separate_thread); + + // Begin the codegen task. If the codegen/link queue is backed up, this might + // block until the linker is able to process some tasks. + const codegen_task = try zcu.codegen_task_pool.start(zcu, func.func, &owned_air.?, disown_air); + if (disown_air) owned_air = null; + + try comp.link_queue.enqueueZcu(comp, tid, .{ .link_func = codegen_task }); }, .link_nav => |nav_index| { const zcu = comp.zcu.?; @@ -5111,7 +5102,7 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { comp.link_prog_node.completeOne(); return; } - comp.dispatchZcuLinkTask(tid, .{ .link_nav = nav_index }); + try comp.link_queue.enqueueZcu(comp, tid, .{ .link_nav = nav_index }); }, .link_type => |ty| { const zcu = comp.zcu.?; @@ -5123,10 +5114,10 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { comp.link_prog_node.completeOne(); return; } - comp.dispatchZcuLinkTask(tid, .{ .link_type = ty }); + try comp.link_queue.enqueueZcu(comp, tid, .{ .link_type = ty }); }, - .update_line_number => |ti| { - comp.dispatchZcuLinkTask(tid, .{ .update_line_number = ti }); + .update_line_number => |tracked_inst| { + try comp.link_queue.enqueueZcu(comp, tid, .{ .update_line_number = tracked_inst }); }, .analyze_func => |func| { const named_frame = tracy.namedFrame("analyze_func"); @@ -5220,12 +5211,6 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void { } } -pub fn separateCodegenThreadOk(comp: *const Compilation) bool { - if (InternPool.single_threaded) return false; - const zcu = comp.zcu orelse return true; - return zcu.backendSupportsFeature(.separate_thread); -} - fn createDepFile( comp: *Compilation, depfile: []const u8, @@ -5480,6 +5465,7 @@ fn workerDocsWasmFallible(comp: *Compilation, prog_node: std.Progress.Node) SubU var sub_create_diag: CreateDiagnostic = undefined; const sub_compilation = Compilation.create(gpa, arena, io, &sub_create_diag, .{ + .thread_limit = comp.thread_limit, .dirs = dirs, .self_exe_path = comp.self_exe_path, .config = config, @@ -5487,7 +5473,6 @@ fn workerDocsWasmFallible(comp: *Compilation, prog_node: std.Progress.Node) SubU .entry = .disabled, .cache_mode = .whole, .root_name = root_name, - .thread_pool = comp.thread_pool, .libc_installation = comp.libc_installation, .emit_bin = .yes_cache, .verbose_cc = comp.verbose_cc, @@ -5541,13 +5526,15 @@ fn workerDocsWasmFallible(comp: *Compilation, prog_node: std.Progress.Node) SubU } fn workerUpdateFile( - tid: usize, comp: *Compilation, file: *Zcu.File, file_index: Zcu.File.Index, prog_node: std.Progress.Node, - wg: *WaitGroup, + group: *Io.Group, ) void { + const tid = Compilation.getTid(); + const io = comp.io; + const child_prog_node = prog_node.start(fs.path.basename(file.path.sub_path), 0); defer child_prog_node.end(); @@ -5556,8 +5543,8 @@ fn workerUpdateFile( pt.updateFile(file_index, file) catch |err| { pt.reportRetryableFileError(file_index, "unable to load '{s}': {s}", .{ fs.path.basename(file.path.sub_path), @errorName(err) }) catch |oom| switch (oom) { error.OutOfMemory => { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.setAllocFailure(); }, }; @@ -5587,14 +5574,14 @@ fn workerUpdateFile( if (pt.discoverImport(file.path, import_path)) |res| switch (res) { .module, .existing_file => {}, .new_file => |new| { - comp.thread_pool.spawnWgId(wg, workerUpdateFile, .{ - comp, new.file, new.index, prog_node, wg, + group.async(io, workerUpdateFile, .{ + comp, new.file, new.index, prog_node, group, }); }, } else |err| switch (err) { error.OutOfMemory => { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.setAllocFailure(); }, } @@ -5610,17 +5597,20 @@ fn workerUpdateBuiltinFile(comp: *Compilation, file: *Zcu.File) void { ); } -fn workerUpdateEmbedFile(tid: usize, comp: *Compilation, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) void { +fn workerUpdateEmbedFile(comp: *Compilation, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) void { + const tid = Compilation.getTid(); + const io = comp.io; comp.detectEmbedFileUpdate(@enumFromInt(tid), ef_index, ef) catch |err| switch (err) { error.OutOfMemory => { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); comp.setAllocFailure(); }, }; } fn detectEmbedFileUpdate(comp: *Compilation, tid: Zcu.PerThread.Id, ef_index: Zcu.EmbedFile.Index, ef: *Zcu.EmbedFile) !void { + const io = comp.io; const zcu = comp.zcu.?; const pt: Zcu.PerThread = .activate(zcu, tid); defer pt.deactivate(); @@ -5633,8 +5623,8 @@ fn detectEmbedFileUpdate(comp: *Compilation, tid: Zcu.PerThread.Id, ef_index: Zc if (ef.val != .none and ef.val == old_val) return; // success, value unchanged if (ef.val == .none and old_val == .none and ef.err == old_err) return; // failure, error unchanged - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try zcu.markDependeeOutdated(.not_marked_po, .{ .embed_file = ef_index }); } @@ -5777,8 +5767,8 @@ pub fn translateC( switch (comp.cache_use) { .whole => |whole| if (whole.cache_manifest) |whole_cache_manifest| { - whole.cache_manifest_mutex.lock(); - defer whole.cache_manifest_mutex.unlock(); + try whole.cache_manifest_mutex.lock(io); + defer whole.cache_manifest_mutex.unlock(io); try whole_cache_manifest.addDepFilePost(cache_tmp_dir, dep_basename); }, .incremental, .none => {}, @@ -5879,7 +5869,6 @@ fn workerUpdateCObject( c_object: *CObject, progress_node: std.Progress.Node, ) void { - defer comp.link_task_queue.finishPrelinkItem(comp); comp.updateCObject(c_object, progress_node) catch |err| switch (err) { error.AnalysisFail => return, else => { @@ -5897,7 +5886,6 @@ fn workerUpdateWin32Resource( win32_resource: *Win32Resource, progress_node: std.Progress.Node, ) void { - defer comp.link_task_queue.finishPrelinkItem(comp); comp.updateWin32Resource(win32_resource, progress_node) catch |err| switch (err) { error.AnalysisFail => return, else => { @@ -5915,21 +5903,6 @@ pub const RtOptions = struct { allow_lto: bool = true, }; -fn workerZcuCodegen( - tid: usize, - comp: *Compilation, - func_index: InternPool.Index, - orig_air: Air, - out: *link.ZcuTask.LinkFunc.SharedMir, -) void { - var air = orig_air; - // We own `air` now, so we are responsbile for freeing it. - defer air.deinit(comp.gpa); - const pt: Zcu.PerThread = .activate(comp.zcu.?, @enumFromInt(tid)); - defer pt.deactivate(); - pt.runCodegen(func_index, &air, out); -} - fn buildRt( comp: *Compilation, root_source_name: []const u8, @@ -5941,7 +5914,6 @@ fn buildRt( options: RtOptions, out: *?CrtFile, ) void { - defer comp.link_task_queue.finishPrelinkItem(comp); comp.buildOutputFromZig( root_source_name, root_name, @@ -5960,7 +5932,6 @@ fn buildRt( } fn buildMuslCrtFile(comp: *Compilation, crt_file: musl.CrtFile, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (musl.buildCrtFile(comp, crt_file, prog_node)) |_| { comp.queued_jobs.musl_crt_file[@intFromEnum(crt_file)] = false; } else |err| switch (err) { @@ -5972,7 +5943,6 @@ fn buildMuslCrtFile(comp: *Compilation, crt_file: musl.CrtFile, prog_node: std.P } fn buildGlibcCrtFile(comp: *Compilation, crt_file: glibc.CrtFile, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (glibc.buildCrtFile(comp, crt_file, prog_node)) |_| { comp.queued_jobs.glibc_crt_file[@intFromEnum(crt_file)] = false; } else |err| switch (err) { @@ -5984,7 +5954,6 @@ fn buildGlibcCrtFile(comp: *Compilation, crt_file: glibc.CrtFile, prog_node: std } fn buildGlibcSharedObjects(comp: *Compilation, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (glibc.buildSharedObjects(comp, prog_node)) |_| { // The job should no longer be queued up since it succeeded. comp.queued_jobs.glibc_shared_objects = false; @@ -5995,7 +5964,6 @@ fn buildGlibcSharedObjects(comp: *Compilation, prog_node: std.Progress.Node) voi } fn buildFreeBSDCrtFile(comp: *Compilation, crt_file: freebsd.CrtFile, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (freebsd.buildCrtFile(comp, crt_file, prog_node)) |_| { comp.queued_jobs.freebsd_crt_file[@intFromEnum(crt_file)] = false; } else |err| switch (err) { @@ -6007,7 +5975,6 @@ fn buildFreeBSDCrtFile(comp: *Compilation, crt_file: freebsd.CrtFile, prog_node: } fn buildFreeBSDSharedObjects(comp: *Compilation, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (freebsd.buildSharedObjects(comp, prog_node)) |_| { // The job should no longer be queued up since it succeeded. comp.queued_jobs.freebsd_shared_objects = false; @@ -6020,7 +5987,6 @@ fn buildFreeBSDSharedObjects(comp: *Compilation, prog_node: std.Progress.Node) v } fn buildNetBSDCrtFile(comp: *Compilation, crt_file: netbsd.CrtFile, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (netbsd.buildCrtFile(comp, crt_file, prog_node)) |_| { comp.queued_jobs.netbsd_crt_file[@intFromEnum(crt_file)] = false; } else |err| switch (err) { @@ -6032,7 +5998,6 @@ fn buildNetBSDCrtFile(comp: *Compilation, crt_file: netbsd.CrtFile, prog_node: s } fn buildNetBSDSharedObjects(comp: *Compilation, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (netbsd.buildSharedObjects(comp, prog_node)) |_| { // The job should no longer be queued up since it succeeded. comp.queued_jobs.netbsd_shared_objects = false; @@ -6045,7 +6010,6 @@ fn buildNetBSDSharedObjects(comp: *Compilation, prog_node: std.Progress.Node) vo } fn buildMingwCrtFile(comp: *Compilation, crt_file: mingw.CrtFile, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (mingw.buildCrtFile(comp, crt_file, prog_node)) |_| { comp.queued_jobs.mingw_crt_file[@intFromEnum(crt_file)] = false; } else |err| switch (err) { @@ -6057,7 +6021,6 @@ fn buildMingwCrtFile(comp: *Compilation, crt_file: mingw.CrtFile, prog_node: std } fn buildWasiLibcCrtFile(comp: *Compilation, crt_file: wasi_libc.CrtFile, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (wasi_libc.buildCrtFile(comp, crt_file, prog_node)) |_| { comp.queued_jobs.wasi_libc_crt_file[@intFromEnum(crt_file)] = false; } else |err| switch (err) { @@ -6069,7 +6032,6 @@ fn buildWasiLibcCrtFile(comp: *Compilation, crt_file: wasi_libc.CrtFile, prog_no } fn buildLibUnwind(comp: *Compilation, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (libunwind.buildStaticLib(comp, prog_node)) |_| { comp.queued_jobs.libunwind = false; } else |err| switch (err) { @@ -6079,7 +6041,6 @@ fn buildLibUnwind(comp: *Compilation, prog_node: std.Progress.Node) void { } fn buildLibCxx(comp: *Compilation, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (libcxx.buildLibCxx(comp, prog_node)) |_| { comp.queued_jobs.libcxx = false; } else |err| switch (err) { @@ -6089,7 +6050,6 @@ fn buildLibCxx(comp: *Compilation, prog_node: std.Progress.Node) void { } fn buildLibCxxAbi(comp: *Compilation, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (libcxx.buildLibCxxAbi(comp, prog_node)) |_| { comp.queued_jobs.libcxxabi = false; } else |err| switch (err) { @@ -6099,7 +6059,6 @@ fn buildLibCxxAbi(comp: *Compilation, prog_node: std.Progress.Node) void { } fn buildLibTsan(comp: *Compilation, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); if (libtsan.buildTsan(comp, prog_node)) |_| { comp.queued_jobs.libtsan = false; } else |err| switch (err) { @@ -6109,7 +6068,6 @@ fn buildLibTsan(comp: *Compilation, prog_node: std.Progress.Node) void { } fn buildLibZigC(comp: *Compilation, prog_node: std.Progress.Node) void { - defer comp.link_task_queue.finishPrelinkItem(comp); comp.buildOutputFromZig( "c.zig", "zigc", @@ -6139,6 +6097,8 @@ fn reportRetryableWin32ResourceError( win32_resource: *Win32Resource, err: anyerror, ) error{OutOfMemory}!void { + const io = comp.io; + win32_resource.status = .failure_retryable; var bundle: ErrorBundle.Wip = undefined; @@ -6160,8 +6120,8 @@ fn reportRetryableWin32ResourceError( }); const finished_bundle = try bundle.toOwnedBundle(""); { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try comp.failed_win32_resources.putNoClobber(comp.gpa, win32_resource, finished_bundle); } } @@ -6186,8 +6146,8 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_obj_prog_node: std.Pr if (c_object.clearStatus(gpa)) { // There was previous failure. - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); // If the failure was OOM, there will not be an entry here, so we do // not assert discard. _ = comp.failed_c_objects.swapRemove(c_object); @@ -6457,8 +6417,8 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_obj_prog_node: std.Pr switch (comp.cache_use) { .whole => |whole| { if (whole.cache_manifest) |whole_cache_manifest| { - whole.cache_manifest_mutex.lock(); - defer whole.cache_manifest_mutex.unlock(); + try whole.cache_manifest_mutex.lock(io); + defer whole.cache_manifest_mutex.unlock(io); try whole_cache_manifest.addDepFilePost(zig_cache_tmp_dir, dep_basename); } }, @@ -6503,7 +6463,7 @@ fn updateCObject(comp: *Compilation, c_object: *CObject, c_obj_prog_node: std.Pr }, }; - comp.queuePrelinkTasks(&.{.{ .load_object = c_object.status.success.object_path }}); + try comp.queuePrelinkTasks(&.{.{ .load_object = c_object.status.success.object_path }}); } fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32_resource_prog_node: std.Progress.Node) !void { @@ -6517,6 +6477,8 @@ fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32 const tracy_trace = trace(@src()); defer tracy_trace.end(); + const io = comp.io; + const src_path = switch (win32_resource.src) { .rc => |rc_src| rc_src.src_path, .manifest => |src_path| src_path, @@ -6531,8 +6493,8 @@ fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32 if (win32_resource.clearStatus(comp.gpa)) { // There was previous failure. - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); // If the failure was OOM, there will not be an entry here, so we do // not assert discard. _ = comp.failed_win32_resources.swapRemove(win32_resource); @@ -6706,8 +6668,8 @@ fn updateWin32Resource(comp: *Compilation, win32_resource: *Win32Resource, win32 try man.addFilePost(dep_file_path); switch (comp.cache_use) { .whole => |whole| if (whole.cache_manifest) |whole_cache_manifest| { - whole.cache_manifest_mutex.lock(); - defer whole.cache_manifest_mutex.unlock(); + try whole.cache_manifest_mutex.lock(io); + defer whole.cache_manifest_mutex.unlock(io); try whole_cache_manifest.addFilePost(dep_file_path); }, .incremental, .none => {}, @@ -7428,8 +7390,9 @@ fn failCObjWithOwnedDiagBundle( @branchHint(.cold); assert(diag_bundle.diags.len > 0); { - comp.mutex.lock(); - defer comp.mutex.unlock(); + const io = comp.io; + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); { errdefer diag_bundle.destroy(comp.gpa); try comp.failed_c_objects.ensureUnusedCapacity(comp.gpa, 1); @@ -7470,8 +7433,9 @@ fn failWin32ResourceWithOwnedBundle( ) error{ OutOfMemory, AnalysisFail } { @branchHint(.cold); { - comp.mutex.lock(); - defer comp.mutex.unlock(); + const io = comp.io; + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try comp.failed_win32_resources.putNoClobber(comp.gpa, win32_resource, err_bundle); } win32_resource.status = .failure; @@ -7795,9 +7759,9 @@ pub fn lockAndSetMiscFailure( comptime format: []const u8, args: anytype, ) void { - comp.mutex.lock(); - defer comp.mutex.unlock(); - + const io = comp.io; + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); return setMiscFailure(comp, tag, format, args); } @@ -7840,8 +7804,8 @@ pub fn updateSubCompilation( defer errors.deinit(gpa); if (errors.errorMessageCount() > 0) { - parent_comp.mutex.lock(); - defer parent_comp.mutex.unlock(); + parent_comp.mutex.lockUncancelable(parent_comp.io); + defer parent_comp.mutex.unlock(parent_comp.io); try parent_comp.misc_failures.ensureUnusedCapacity(gpa, 1); parent_comp.misc_failures.putAssumeCapacityNoClobber(misc_task, .{ .msg = try std.fmt.allocPrint(gpa, "sub-compilation of {t} failed", .{misc_task}), @@ -7942,6 +7906,7 @@ fn buildOutputFromZig( var sub_create_diag: CreateDiagnostic = undefined; const sub_compilation = Compilation.create(gpa, arena, io, &sub_create_diag, .{ + .thread_limit = comp.thread_limit, .dirs = comp.dirs.withoutLocalCache(), .cache_mode = .whole, .parent_whole_cache = parent_whole_cache, @@ -7949,7 +7914,6 @@ fn buildOutputFromZig( .config = config, .root_mod = root_mod, .root_name = root_name, - .thread_pool = comp.thread_pool, .libc_installation = comp.libc_installation, .emit_bin = .yes_cache, .function_sections = true, @@ -7980,7 +7944,7 @@ fn buildOutputFromZig( assert(out.* == null); out.* = crt_file; - comp.queuePrelinkTaskMode(crt_file.full_object_path, &config); + try comp.queuePrelinkTaskMode(crt_file.full_object_path, &config); } pub const CrtFileOptions = struct { @@ -8079,13 +8043,13 @@ pub fn build_crt_file( var sub_create_diag: CreateDiagnostic = undefined; const sub_compilation = Compilation.create(gpa, arena, io, &sub_create_diag, .{ + .thread_limit = comp.thread_limit, .dirs = comp.dirs.withoutLocalCache(), .self_exe_path = comp.self_exe_path, .cache_mode = .whole, .config = config, .root_mod = root_mod, .root_name = root_name, - .thread_pool = comp.thread_pool, .libc_installation = comp.libc_installation, .emit_bin = .yes_cache, .function_sections = options.function_sections orelse false, @@ -8114,18 +8078,18 @@ pub fn build_crt_file( try comp.updateSubCompilation(sub_compilation, misc_task_tag, prog_node); const crt_file = try sub_compilation.toCrtFile(); - comp.queuePrelinkTaskMode(crt_file.full_object_path, &config); + try comp.queuePrelinkTaskMode(crt_file.full_object_path, &config); { - comp.mutex.lock(); - defer comp.mutex.unlock(); + comp.mutex.lockUncancelable(io); + defer comp.mutex.unlock(io); try comp.crt_files.ensureUnusedCapacity(gpa, 1); comp.crt_files.putAssumeCapacityNoClobber(basename, crt_file); } } -pub fn queuePrelinkTaskMode(comp: *Compilation, path: Cache.Path, config: *const Compilation.Config) void { - comp.queuePrelinkTasks(switch (config.output_mode) { +pub fn queuePrelinkTaskMode(comp: *Compilation, path: Cache.Path, config: *const Compilation.Config) Io.Cancelable!void { + try comp.queuePrelinkTasks(switch (config.output_mode) { .Exe => unreachable, .Obj => &.{.{ .load_object = path }}, .Lib => &.{switch (config.link_mode) { @@ -8135,33 +8099,10 @@ pub fn queuePrelinkTaskMode(comp: *Compilation, path: Cache.Path, config: *const }); } -/// Only valid to call during `update`. Automatically handles queuing up a -/// linker worker task if there is not already one. -pub fn queuePrelinkTasks(comp: *Compilation, tasks: []const link.PrelinkTask) void { +/// Only valid to call during `update`. +pub fn queuePrelinkTasks(comp: *Compilation, tasks: []const link.PrelinkTask) Io.Cancelable!void { comp.link_prog_node.increaseEstimatedTotalItems(tasks.len); - comp.link_task_queue.enqueuePrelink(comp, tasks) catch |err| switch (err) { - error.OutOfMemory => return comp.setAllocFailure(), - }; -} - -/// The reason for the double-queue here is that the first queue ensures any -/// resolve_type_fully tasks are complete before this dispatch function is called. -fn dispatchZcuLinkTask(comp: *Compilation, tid: usize, task: link.ZcuTask) void { - if (!comp.separateCodegenThreadOk()) { - assert(tid == 0); - if (task == .link_func) { - assert(task.link_func.mir.status.load(.monotonic) != .pending); - } - link.doZcuTask(comp, tid, task); - task.deinit(comp.zcu.?); - return; - } - comp.link_task_queue.enqueueZcu(comp, task) catch |err| switch (err) { - error.OutOfMemory => { - task.deinit(comp.zcu.?); - comp.setAllocFailure(); - }, - }; + try comp.link_queue.enqueuePrelink(comp, tasks); } pub fn toCrtFile(comp: *Compilation) Allocator.Error!CrtFile { @@ -8251,3 +8192,17 @@ pub fn compilerRtOptMode(comp: Compilation) std.builtin.OptimizeMode { pub fn compilerRtStrip(comp: Compilation) bool { return comp.root_mod.strip; } + +/// This is a temporary workaround put in place to migrate from `std.Thread.Pool` +/// to `std.Io.Threaded` for asynchronous/concurrent work. The eventual solution +/// will likely involve significant changes to the `InternPool` implementation. +pub fn getTid() usize { + if (my_tid == null) my_tid = next_tid.fetchAdd(1, .monotonic); + return my_tid.?; +} +pub fn setMainThread() void { + my_tid = 0; +} +/// TID 0 is reserved for the main thread. +var next_tid: std.atomic.Value(usize) = .init(1); +threadlocal var my_tid: ?usize = null; |
