diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-06-12 20:46:36 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-06-12 20:46:36 -0400 |
| commit | dcdb4422b801f2d184107fdd7b9493f7840a0244 (patch) | |
| tree | ca7a37c544382c10e45fbad68ea7701a05d0543c /src/link/Queue.zig | |
| parent | 5e3c0b7af7cd866f5464c244b9775e488b93ae48 (diff) | |
| parent | 43d01ff69f6c6c46bef81dd4de2c78fb0a942b65 (diff) | |
| download | zig-dcdb4422b801f2d184107fdd7b9493f7840a0244.tar.gz zig-dcdb4422b801f2d184107fdd7b9493f7840a0244.zip | |
Merge pull request #24124 from mlugg/better-backend-pipeline-2
compiler: threaded codegen (and more goodies)
Diffstat (limited to 'src/link/Queue.zig')
| -rw-r--r-- | src/link/Queue.zig | 279 |
1 files changed, 279 insertions, 0 deletions
diff --git a/src/link/Queue.zig b/src/link/Queue.zig new file mode 100644 index 0000000000..d197edab02 --- /dev/null +++ b/src/link/Queue.zig @@ -0,0 +1,279 @@ +//! Stores and manages the queue of link tasks. Each task is either a `PrelinkTask` or a `ZcuTask`. +//! +//! There must be at most one link thread (the thread processing these tasks) active at a time. If +//! `!comp.separateCodegenThreadOk()`, then ZCU tasks will be run on the main thread, bypassing this +//! queue entirely. +//! +//! All prelink tasks must be processed before any ZCU tasks are processed. After all prelink tasks +//! are run, but before any ZCU tasks are run, `prelink` must be called on the `link.File`. +//! +//! There will sometimes be a `ZcuTask` in the queue which is not yet ready because it depends on +//! MIR which has not yet been generated by any codegen thread. In this case, we must pause +//! processing of linker tasks until the MIR is ready. It would be incorrect to run any other link +//! tasks first, since this would make builds unreproducible. + +mutex: std.Thread.Mutex, +/// Validates that only one `flushTaskQueue` thread is running at a time. +flush_safety: std.debug.SafetyLock, + +/// This is the number of prelink tasks which are expected but have not yet been enqueued. +/// Guarded by `mutex`. +pending_prelink_tasks: u32, + +/// Prelink tasks which have been enqueued and are not yet owned by the worker thread. +/// Allocated into `gpa`, guarded by `mutex`. +queued_prelink: std.ArrayListUnmanaged(PrelinkTask), +/// The worker thread moves items from `queued_prelink` into this array in order to process them. +/// Allocated into `gpa`, accessed only by the worker thread. +wip_prelink: std.ArrayListUnmanaged(PrelinkTask), + +/// Like `queued_prelink`, but for ZCU tasks. +/// Allocated into `gpa`, guarded by `mutex`. +queued_zcu: std.ArrayListUnmanaged(ZcuTask), +/// Like `wip_prelink`, but for ZCU tasks. +/// Allocated into `gpa`, accessed only by the worker thread. +wip_zcu: std.ArrayListUnmanaged(ZcuTask), + +/// When processing ZCU link tasks, we might have to block due to unpopulated MIR. When this +/// happens, some tasks in `wip_zcu` have been run, and some are still pending. This is the +/// index into `wip_zcu` which we have reached. +wip_zcu_idx: usize, + +/// The sum of all `air_bytes` for all currently-queued `ZcuTask.link_func` tasks. Because +/// MIR bytes are approximately proportional to AIR bytes, this acts to limit the amount of +/// AIR and MIR which is queued for codegen and link respectively, to prevent excessive +/// memory usage if analysis produces AIR faster than it can be processed by codegen/link. +/// The cap is `max_air_bytes_in_flight`. +/// Guarded by `mutex`. +air_bytes_in_flight: u32, +/// If nonzero, then a call to `enqueueZcu` is blocked waiting to add a `link_func` task, but +/// cannot until `air_bytes_in_flight` is no greater than this value. +/// Guarded by `mutex`. +air_bytes_waiting: u32, +/// After setting `air_bytes_waiting`, `enqueueZcu` will wait on this condition (with `mutex`). +/// When `air_bytes_waiting` many bytes can be queued, this condition should be signaled. +air_bytes_cond: std.Thread.Condition, + +/// Guarded by `mutex`. +state: union(enum) { + /// The link thread is currently running or queued to run. + running, + /// The link thread is not running or queued, because it has exhausted all immediately available + /// tasks. It should be spawned when more tasks are enqueued. If `pending_prelink_tasks` is not + /// zero, we are specifically waiting for prelink tasks. + finished, + /// The link thread is not running or queued, because it is waiting for this MIR to be populated. + /// Once codegen completes, it must call `mirReady` which will restart the link thread. + wait_for_mir: *ZcuTask.LinkFunc.SharedMir, +}, + +/// In the worst observed case, MIR is around 50 times as large as AIR. More typically, the ratio is +/// around 20. Going by that 50x multiplier, and assuming we want to consume no more than 500 MiB of +/// memory on AIR/MIR, we see a limit of around 10 MiB of AIR in-flight. +const max_air_bytes_in_flight = 10 * 1024 * 1024; + +/// The initial `Queue` state, containing no tasks, expecting no prelink tasks, and with no running worker thread. +/// The `pending_prelink_tasks` and `queued_prelink` fields may be modified as needed before calling `start`. +pub const empty: Queue = .{ + .mutex = .{}, + .flush_safety = .{}, + .pending_prelink_tasks = 0, + .queued_prelink = .empty, + .wip_prelink = .empty, + .queued_zcu = .empty, + .wip_zcu = .empty, + .wip_zcu_idx = 0, + .state = .finished, + .air_bytes_in_flight = 0, + .air_bytes_waiting = 0, + .air_bytes_cond = .{}, +}; +/// `lf` is needed to correctly deinit any pending `ZcuTask`s. +pub fn deinit(q: *Queue, comp: *Compilation) void { + const gpa = comp.gpa; + for (q.queued_zcu.items) |t| t.deinit(comp.zcu.?); + for (q.wip_zcu.items[q.wip_zcu_idx..]) |t| t.deinit(comp.zcu.?); + q.queued_prelink.deinit(gpa); + q.wip_prelink.deinit(gpa); + q.queued_zcu.deinit(gpa); + q.wip_zcu.deinit(gpa); +} + +/// This is expected to be called exactly once, after which the caller must not directly access +/// `queued_prelink` or `pending_prelink_tasks` any longer. This will spawn the link thread if +/// necessary. +pub fn start(q: *Queue, comp: *Compilation) void { + assert(q.state == .finished); + assert(q.queued_zcu.items.len == 0); + if (q.queued_prelink.items.len != 0) { + q.state = .running; + comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); + } +} + +/// Called by codegen workers after they have populated a `ZcuTask.LinkFunc.SharedMir`. If the link +/// thread was waiting for this MIR, it can resume. +pub fn mirReady(q: *Queue, comp: *Compilation, mir: *ZcuTask.LinkFunc.SharedMir) void { + // We would like to assert that `mir` is not pending, but that would race with a worker thread + // potentially freeing it. + { + q.mutex.lock(); + defer q.mutex.unlock(); + switch (q.state) { + .finished, .running => return, + .wait_for_mir => |wait_for| if (wait_for != mir) return, + } + // We were waiting for `mir`, so we will restart the linker thread. + q.state = .running; + } + assert(mir.status.load(.monotonic) != .pending); + comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); +} + +/// Enqueues all prelink tasks in `tasks`. Asserts that they were expected, i.e. that `tasks.len` is +/// less than or equal to `q.pending_prelink_tasks`. Also asserts that `tasks.len` is not 0. +pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) Allocator.Error!void { + { + q.mutex.lock(); + defer q.mutex.unlock(); + try q.queued_prelink.appendSlice(comp.gpa, tasks); + q.pending_prelink_tasks -= @intCast(tasks.len); + switch (q.state) { + .wait_for_mir => unreachable, // we've not started zcu tasks yet + .running => return, + .finished => {}, + } + // Restart the linker thread, because it was waiting for a task + q.state = .running; + } + comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); +} + +pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!void { + assert(comp.separateCodegenThreadOk()); + { + q.mutex.lock(); + defer q.mutex.unlock(); + // If this is a `link_func` task, we might need to wait for `air_bytes_in_flight` to fall. + if (task == .link_func) { + const max_in_flight = max_air_bytes_in_flight -| task.link_func.air_bytes; + while (q.air_bytes_in_flight > max_in_flight) { + q.air_bytes_waiting = task.link_func.air_bytes; + q.air_bytes_cond.wait(&q.mutex); + q.air_bytes_waiting = 0; + } + q.air_bytes_in_flight += task.link_func.air_bytes; + } + try q.queued_zcu.append(comp.gpa, task); + switch (q.state) { + .running, .wait_for_mir => return, + .finished => if (q.pending_prelink_tasks != 0) return, + } + // Restart the linker thread, unless it would immediately be blocked + if (task == .link_func and task.link_func.mir.status.load(.monotonic) == .pending) { + q.state = .{ .wait_for_mir = task.link_func.mir }; + return; + } + q.state = .running; + } + comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp }); +} + +fn flushTaskQueue(tid: usize, q: *Queue, comp: *Compilation) void { + q.flush_safety.lock(); // every `return` site should unlock this before unlocking `q.mutex` + + if (std.debug.runtime_safety) { + q.mutex.lock(); + defer q.mutex.unlock(); + assert(q.state == .running); + } + prelink: while (true) { + assert(q.wip_prelink.items.len == 0); + { + q.mutex.lock(); + defer q.mutex.unlock(); + std.mem.swap(std.ArrayListUnmanaged(PrelinkTask), &q.queued_prelink, &q.wip_prelink); + if (q.wip_prelink.items.len == 0) { + if (q.pending_prelink_tasks == 0) { + break :prelink; // prelink is done + } else { + // We're expecting more prelink tasks so can't move on to ZCU tasks. + q.state = .finished; + q.flush_safety.unlock(); + return; + } + } + } + for (q.wip_prelink.items) |task| { + link.doPrelinkTask(comp, task); + } + q.wip_prelink.clearRetainingCapacity(); + } + + // We've finished the prelink tasks, so run prelink if necessary. + if (comp.bin_file) |lf| { + if (!lf.post_prelink) { + if (lf.prelink()) |_| { + lf.post_prelink = true; + } else |err| switch (err) { + error.OutOfMemory => comp.link_diags.setAllocFailure(), + error.LinkFailure => {}, + } + } + } + + // Now we can run ZCU tasks. + while (true) { + if (q.wip_zcu.items.len == q.wip_zcu_idx) { + q.wip_zcu.clearRetainingCapacity(); + q.wip_zcu_idx = 0; + q.mutex.lock(); + defer q.mutex.unlock(); + std.mem.swap(std.ArrayListUnmanaged(ZcuTask), &q.queued_zcu, &q.wip_zcu); + if (q.wip_zcu.items.len == 0) { + // We've exhausted all available tasks. + q.state = .finished; + q.flush_safety.unlock(); + return; + } + } + const task = q.wip_zcu.items[q.wip_zcu_idx]; + // If the task is a `link_func`, we might have to stop until its MIR is populated. + pending: { + if (task != .link_func) break :pending; + const status_ptr = &task.link_func.mir.status; + // First check without the mutex to optimize for the common case where MIR is ready. + if (status_ptr.load(.monotonic) != .pending) break :pending; + q.mutex.lock(); + defer q.mutex.unlock(); + if (status_ptr.load(.monotonic) != .pending) break :pending; + // We will stop for now, and get restarted once this MIR is ready. + q.state = .{ .wait_for_mir = task.link_func.mir }; + q.flush_safety.unlock(); + return; + } + link.doZcuTask(comp, tid, task); + task.deinit(comp.zcu.?); + if (task == .link_func) { + // Decrease `air_bytes_in_flight`, since we've finished processing this MIR. + q.mutex.lock(); + defer q.mutex.unlock(); + q.air_bytes_in_flight -= task.link_func.air_bytes; + if (q.air_bytes_waiting != 0 and + q.air_bytes_in_flight <= max_air_bytes_in_flight -| q.air_bytes_waiting) + { + q.air_bytes_cond.signal(); + } + } + q.wip_zcu_idx += 1; + } +} + +const std = @import("std"); +const assert = std.debug.assert; +const Allocator = std.mem.Allocator; +const Compilation = @import("../Compilation.zig"); +const link = @import("../link.zig"); +const PrelinkTask = link.PrelinkTask; +const ZcuTask = link.ZcuTask; +const Queue = @This(); |
