aboutsummaryrefslogtreecommitdiff
path: root/src/link/Queue.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2025-06-12 20:46:36 -0400
committerGitHub <noreply@github.com>2025-06-12 20:46:36 -0400
commitdcdb4422b801f2d184107fdd7b9493f7840a0244 (patch)
treeca7a37c544382c10e45fbad68ea7701a05d0543c /src/link/Queue.zig
parent5e3c0b7af7cd866f5464c244b9775e488b93ae48 (diff)
parent43d01ff69f6c6c46bef81dd4de2c78fb0a942b65 (diff)
downloadzig-dcdb4422b801f2d184107fdd7b9493f7840a0244.tar.gz
zig-dcdb4422b801f2d184107fdd7b9493f7840a0244.zip
Merge pull request #24124 from mlugg/better-backend-pipeline-2
compiler: threaded codegen (and more goodies)
Diffstat (limited to 'src/link/Queue.zig')
-rw-r--r--src/link/Queue.zig279
1 files changed, 279 insertions, 0 deletions
diff --git a/src/link/Queue.zig b/src/link/Queue.zig
new file mode 100644
index 0000000000..d197edab02
--- /dev/null
+++ b/src/link/Queue.zig
@@ -0,0 +1,279 @@
+//! Stores and manages the queue of link tasks. Each task is either a `PrelinkTask` or a `ZcuTask`.
+//!
+//! There must be at most one link thread (the thread processing these tasks) active at a time. If
+//! `!comp.separateCodegenThreadOk()`, then ZCU tasks will be run on the main thread, bypassing this
+//! queue entirely.
+//!
+//! All prelink tasks must be processed before any ZCU tasks are processed. After all prelink tasks
+//! are run, but before any ZCU tasks are run, `prelink` must be called on the `link.File`.
+//!
+//! There will sometimes be a `ZcuTask` in the queue which is not yet ready because it depends on
+//! MIR which has not yet been generated by any codegen thread. In this case, we must pause
+//! processing of linker tasks until the MIR is ready. It would be incorrect to run any other link
+//! tasks first, since this would make builds unreproducible.
+
+mutex: std.Thread.Mutex,
+/// Validates that only one `flushTaskQueue` thread is running at a time.
+flush_safety: std.debug.SafetyLock,
+
+/// This is the number of prelink tasks which are expected but have not yet been enqueued.
+/// Guarded by `mutex`.
+pending_prelink_tasks: u32,
+
+/// Prelink tasks which have been enqueued and are not yet owned by the worker thread.
+/// Allocated into `gpa`, guarded by `mutex`.
+queued_prelink: std.ArrayListUnmanaged(PrelinkTask),
+/// The worker thread moves items from `queued_prelink` into this array in order to process them.
+/// Allocated into `gpa`, accessed only by the worker thread.
+wip_prelink: std.ArrayListUnmanaged(PrelinkTask),
+
+/// Like `queued_prelink`, but for ZCU tasks.
+/// Allocated into `gpa`, guarded by `mutex`.
+queued_zcu: std.ArrayListUnmanaged(ZcuTask),
+/// Like `wip_prelink`, but for ZCU tasks.
+/// Allocated into `gpa`, accessed only by the worker thread.
+wip_zcu: std.ArrayListUnmanaged(ZcuTask),
+
+/// When processing ZCU link tasks, we might have to block due to unpopulated MIR. When this
+/// happens, some tasks in `wip_zcu` have been run, and some are still pending. This is the
+/// index into `wip_zcu` which we have reached.
+wip_zcu_idx: usize,
+
+/// The sum of all `air_bytes` for all currently-queued `ZcuTask.link_func` tasks. Because
+/// MIR bytes are approximately proportional to AIR bytes, this acts to limit the amount of
+/// AIR and MIR which is queued for codegen and link respectively, to prevent excessive
+/// memory usage if analysis produces AIR faster than it can be processed by codegen/link.
+/// The cap is `max_air_bytes_in_flight`.
+/// Guarded by `mutex`.
+air_bytes_in_flight: u32,
+/// If nonzero, then a call to `enqueueZcu` is blocked waiting to add a `link_func` task, but
+/// cannot until `air_bytes_in_flight` is no greater than this value.
+/// Guarded by `mutex`.
+air_bytes_waiting: u32,
+/// After setting `air_bytes_waiting`, `enqueueZcu` will wait on this condition (with `mutex`).
+/// When `air_bytes_waiting` many bytes can be queued, this condition should be signaled.
+air_bytes_cond: std.Thread.Condition,
+
+/// Guarded by `mutex`.
+state: union(enum) {
+ /// The link thread is currently running or queued to run.
+ running,
+ /// The link thread is not running or queued, because it has exhausted all immediately available
+ /// tasks. It should be spawned when more tasks are enqueued. If `pending_prelink_tasks` is not
+ /// zero, we are specifically waiting for prelink tasks.
+ finished,
+ /// The link thread is not running or queued, because it is waiting for this MIR to be populated.
+ /// Once codegen completes, it must call `mirReady` which will restart the link thread.
+ wait_for_mir: *ZcuTask.LinkFunc.SharedMir,
+},
+
+/// In the worst observed case, MIR is around 50 times as large as AIR. More typically, the ratio is
+/// around 20. Going by that 50x multiplier, and assuming we want to consume no more than 500 MiB of
+/// memory on AIR/MIR, we see a limit of around 10 MiB of AIR in-flight.
+const max_air_bytes_in_flight = 10 * 1024 * 1024;
+
+/// The initial `Queue` state, containing no tasks, expecting no prelink tasks, and with no running worker thread.
+/// The `pending_prelink_tasks` and `queued_prelink` fields may be modified as needed before calling `start`.
+pub const empty: Queue = .{
+ .mutex = .{},
+ .flush_safety = .{},
+ .pending_prelink_tasks = 0,
+ .queued_prelink = .empty,
+ .wip_prelink = .empty,
+ .queued_zcu = .empty,
+ .wip_zcu = .empty,
+ .wip_zcu_idx = 0,
+ .state = .finished,
+ .air_bytes_in_flight = 0,
+ .air_bytes_waiting = 0,
+ .air_bytes_cond = .{},
+};
+/// `lf` is needed to correctly deinit any pending `ZcuTask`s.
+pub fn deinit(q: *Queue, comp: *Compilation) void {
+ const gpa = comp.gpa;
+ for (q.queued_zcu.items) |t| t.deinit(comp.zcu.?);
+ for (q.wip_zcu.items[q.wip_zcu_idx..]) |t| t.deinit(comp.zcu.?);
+ q.queued_prelink.deinit(gpa);
+ q.wip_prelink.deinit(gpa);
+ q.queued_zcu.deinit(gpa);
+ q.wip_zcu.deinit(gpa);
+}
+
+/// This is expected to be called exactly once, after which the caller must not directly access
+/// `queued_prelink` or `pending_prelink_tasks` any longer. This will spawn the link thread if
+/// necessary.
+pub fn start(q: *Queue, comp: *Compilation) void {
+ assert(q.state == .finished);
+ assert(q.queued_zcu.items.len == 0);
+ if (q.queued_prelink.items.len != 0) {
+ q.state = .running;
+ comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
+ }
+}
+
+/// Called by codegen workers after they have populated a `ZcuTask.LinkFunc.SharedMir`. If the link
+/// thread was waiting for this MIR, it can resume.
+pub fn mirReady(q: *Queue, comp: *Compilation, mir: *ZcuTask.LinkFunc.SharedMir) void {
+ // We would like to assert that `mir` is not pending, but that would race with a worker thread
+ // potentially freeing it.
+ {
+ q.mutex.lock();
+ defer q.mutex.unlock();
+ switch (q.state) {
+ .finished, .running => return,
+ .wait_for_mir => |wait_for| if (wait_for != mir) return,
+ }
+ // We were waiting for `mir`, so we will restart the linker thread.
+ q.state = .running;
+ }
+ assert(mir.status.load(.monotonic) != .pending);
+ comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
+}
+
+/// Enqueues all prelink tasks in `tasks`. Asserts that they were expected, i.e. that `tasks.len` is
+/// less than or equal to `q.pending_prelink_tasks`. Also asserts that `tasks.len` is not 0.
+pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) Allocator.Error!void {
+ {
+ q.mutex.lock();
+ defer q.mutex.unlock();
+ try q.queued_prelink.appendSlice(comp.gpa, tasks);
+ q.pending_prelink_tasks -= @intCast(tasks.len);
+ switch (q.state) {
+ .wait_for_mir => unreachable, // we've not started zcu tasks yet
+ .running => return,
+ .finished => {},
+ }
+ // Restart the linker thread, because it was waiting for a task
+ q.state = .running;
+ }
+ comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
+}
+
+pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!void {
+ assert(comp.separateCodegenThreadOk());
+ {
+ q.mutex.lock();
+ defer q.mutex.unlock();
+ // If this is a `link_func` task, we might need to wait for `air_bytes_in_flight` to fall.
+ if (task == .link_func) {
+ const max_in_flight = max_air_bytes_in_flight -| task.link_func.air_bytes;
+ while (q.air_bytes_in_flight > max_in_flight) {
+ q.air_bytes_waiting = task.link_func.air_bytes;
+ q.air_bytes_cond.wait(&q.mutex);
+ q.air_bytes_waiting = 0;
+ }
+ q.air_bytes_in_flight += task.link_func.air_bytes;
+ }
+ try q.queued_zcu.append(comp.gpa, task);
+ switch (q.state) {
+ .running, .wait_for_mir => return,
+ .finished => if (q.pending_prelink_tasks != 0) return,
+ }
+ // Restart the linker thread, unless it would immediately be blocked
+ if (task == .link_func and task.link_func.mir.status.load(.monotonic) == .pending) {
+ q.state = .{ .wait_for_mir = task.link_func.mir };
+ return;
+ }
+ q.state = .running;
+ }
+ comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
+}
+
+fn flushTaskQueue(tid: usize, q: *Queue, comp: *Compilation) void {
+ q.flush_safety.lock(); // every `return` site should unlock this before unlocking `q.mutex`
+
+ if (std.debug.runtime_safety) {
+ q.mutex.lock();
+ defer q.mutex.unlock();
+ assert(q.state == .running);
+ }
+ prelink: while (true) {
+ assert(q.wip_prelink.items.len == 0);
+ {
+ q.mutex.lock();
+ defer q.mutex.unlock();
+ std.mem.swap(std.ArrayListUnmanaged(PrelinkTask), &q.queued_prelink, &q.wip_prelink);
+ if (q.wip_prelink.items.len == 0) {
+ if (q.pending_prelink_tasks == 0) {
+ break :prelink; // prelink is done
+ } else {
+ // We're expecting more prelink tasks so can't move on to ZCU tasks.
+ q.state = .finished;
+ q.flush_safety.unlock();
+ return;
+ }
+ }
+ }
+ for (q.wip_prelink.items) |task| {
+ link.doPrelinkTask(comp, task);
+ }
+ q.wip_prelink.clearRetainingCapacity();
+ }
+
+ // We've finished the prelink tasks, so run prelink if necessary.
+ if (comp.bin_file) |lf| {
+ if (!lf.post_prelink) {
+ if (lf.prelink()) |_| {
+ lf.post_prelink = true;
+ } else |err| switch (err) {
+ error.OutOfMemory => comp.link_diags.setAllocFailure(),
+ error.LinkFailure => {},
+ }
+ }
+ }
+
+ // Now we can run ZCU tasks.
+ while (true) {
+ if (q.wip_zcu.items.len == q.wip_zcu_idx) {
+ q.wip_zcu.clearRetainingCapacity();
+ q.wip_zcu_idx = 0;
+ q.mutex.lock();
+ defer q.mutex.unlock();
+ std.mem.swap(std.ArrayListUnmanaged(ZcuTask), &q.queued_zcu, &q.wip_zcu);
+ if (q.wip_zcu.items.len == 0) {
+ // We've exhausted all available tasks.
+ q.state = .finished;
+ q.flush_safety.unlock();
+ return;
+ }
+ }
+ const task = q.wip_zcu.items[q.wip_zcu_idx];
+ // If the task is a `link_func`, we might have to stop until its MIR is populated.
+ pending: {
+ if (task != .link_func) break :pending;
+ const status_ptr = &task.link_func.mir.status;
+ // First check without the mutex to optimize for the common case where MIR is ready.
+ if (status_ptr.load(.monotonic) != .pending) break :pending;
+ q.mutex.lock();
+ defer q.mutex.unlock();
+ if (status_ptr.load(.monotonic) != .pending) break :pending;
+ // We will stop for now, and get restarted once this MIR is ready.
+ q.state = .{ .wait_for_mir = task.link_func.mir };
+ q.flush_safety.unlock();
+ return;
+ }
+ link.doZcuTask(comp, tid, task);
+ task.deinit(comp.zcu.?);
+ if (task == .link_func) {
+ // Decrease `air_bytes_in_flight`, since we've finished processing this MIR.
+ q.mutex.lock();
+ defer q.mutex.unlock();
+ q.air_bytes_in_flight -= task.link_func.air_bytes;
+ if (q.air_bytes_waiting != 0 and
+ q.air_bytes_in_flight <= max_air_bytes_in_flight -| q.air_bytes_waiting)
+ {
+ q.air_bytes_cond.signal();
+ }
+ }
+ q.wip_zcu_idx += 1;
+ }
+}
+
+const std = @import("std");
+const assert = std.debug.assert;
+const Allocator = std.mem.Allocator;
+const Compilation = @import("../Compilation.zig");
+const link = @import("../link.zig");
+const PrelinkTask = link.PrelinkTask;
+const ZcuTask = link.ZcuTask;
+const Queue = @This();