1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
//! Stores and manages the queue of link tasks. Each task is either a `PrelinkTask` or a `ZcuTask`.
//!
//! There must be at most one link thread (the thread processing these tasks) active at a time. If
//! `!comp.separateCodegenThreadOk()`, then ZCU tasks will be run on the main thread, bypassing this
//! queue entirely.
//!
//! All prelink tasks must be processed before any ZCU tasks are processed. After all prelink tasks
//! are run, but before any ZCU tasks are run, `prelink` must be called on the `link.File`.
//!
//! There will sometimes be a `ZcuTask` in the queue which is not yet ready because it depends on
//! MIR which has not yet been generated by any codegen thread. In this case, we must pause
//! processing of linker tasks until the MIR is ready. It would be incorrect to run any other link
//! tasks first, since this would make builds unreproducible.
mutex: std.Thread.Mutex,
/// Validates that only one `flushTaskQueue` thread is running at a time.
flush_safety: std.debug.SafetyLock,
/// This is the number of prelink tasks which are expected but have not yet been enqueued.
/// Guarded by `mutex`.
pending_prelink_tasks: u32,
/// Prelink tasks which have been enqueued and are not yet owned by the worker thread.
/// Allocated into `gpa`, guarded by `mutex`.
queued_prelink: std.ArrayListUnmanaged(PrelinkTask),
/// The worker thread moves items from `queued_prelink` into this array in order to process them.
/// Allocated into `gpa`, accessed only by the worker thread.
wip_prelink: std.ArrayListUnmanaged(PrelinkTask),
/// Like `queued_prelink`, but for ZCU tasks.
/// Allocated into `gpa`, guarded by `mutex`.
queued_zcu: std.ArrayListUnmanaged(ZcuTask),
/// Like `wip_prelink`, but for ZCU tasks.
/// Allocated into `gpa`, accessed only by the worker thread.
wip_zcu: std.ArrayListUnmanaged(ZcuTask),
/// When processing ZCU link tasks, we might have to block due to unpopulated MIR. When this
/// happens, some tasks in `wip_zcu` have been run, and some are still pending. This is the
/// index into `wip_zcu` which we have reached.
wip_zcu_idx: usize,
/// The sum of all `air_bytes` for all currently-queued `ZcuTask.link_func` tasks. Because
/// MIR bytes are approximately proportional to AIR bytes, this acts to limit the amount of
/// AIR and MIR which is queued for codegen and link respectively, to prevent excessive
/// memory usage if analysis produces AIR faster than it can be processed by codegen/link.
/// The cap is `max_air_bytes_in_flight`.
/// Guarded by `mutex`.
air_bytes_in_flight: u32,
/// If nonzero, then a call to `enqueueZcu` is blocked waiting to add a `link_func` task, but
/// cannot until `air_bytes_in_flight` is no greater than this value.
/// Guarded by `mutex`.
air_bytes_waiting: u32,
/// After setting `air_bytes_waiting`, `enqueueZcu` will wait on this condition (with `mutex`).
/// When `air_bytes_waiting` many bytes can be queued, this condition should be signaled.
air_bytes_cond: std.Thread.Condition,
/// Guarded by `mutex`.
state: union(enum) {
/// The link thread is currently running or queued to run.
running,
/// The link thread is not running or queued, because it has exhausted all immediately available
/// tasks. It should be spawned when more tasks are enqueued. If `pending_prelink_tasks` is not
/// zero, we are specifically waiting for prelink tasks.
finished,
/// The link thread is not running or queued, because it is waiting for this MIR to be populated.
/// Once codegen completes, it must call `mirReady` which will restart the link thread.
wait_for_mir: InternPool.Index,
},
/// In the worst observed case, MIR is around 50 times as large as AIR. More typically, the ratio is
/// around 20. Going by that 50x multiplier, and assuming we want to consume no more than 500 MiB of
/// memory on AIR/MIR, we see a limit of around 10 MiB of AIR in-flight.
const max_air_bytes_in_flight = 10 * 1024 * 1024;
/// The initial `Queue` state, containing no tasks, expecting no prelink tasks, and with no running worker thread.
/// The `pending_prelink_tasks` and `queued_prelink` fields may be modified as needed before calling `start`.
pub const empty: Queue = .{
.mutex = .{},
.flush_safety = .{},
.pending_prelink_tasks = 0,
.queued_prelink = .empty,
.wip_prelink = .empty,
.queued_zcu = .empty,
.wip_zcu = .empty,
.wip_zcu_idx = 0,
.state = .finished,
.air_bytes_in_flight = 0,
.air_bytes_waiting = 0,
.air_bytes_cond = .{},
};
/// `lf` is needed to correctly deinit any pending `ZcuTask`s.
pub fn deinit(q: *Queue, comp: *Compilation) void {
const gpa = comp.gpa;
for (q.queued_zcu.items) |t| t.deinit(comp.zcu.?);
for (q.wip_zcu.items[q.wip_zcu_idx..]) |t| t.deinit(comp.zcu.?);
q.queued_prelink.deinit(gpa);
q.wip_prelink.deinit(gpa);
q.queued_zcu.deinit(gpa);
q.wip_zcu.deinit(gpa);
}
/// This is expected to be called exactly once, after which the caller must not directly access
/// `queued_prelink` or `pending_prelink_tasks` any longer. This will spawn the link thread if
/// necessary.
pub fn start(q: *Queue, comp: *Compilation) void {
assert(q.state == .finished);
assert(q.queued_zcu.items.len == 0);
if (q.queued_prelink.items.len != 0) {
q.state = .running;
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
}
}
/// Called by codegen workers after they have populated a `ZcuTask.LinkFunc.SharedMir`. If the link
/// thread was waiting for this MIR, it can resume.
pub fn mirReady(q: *Queue, comp: *Compilation, func_index: InternPool.Index, mir: *ZcuTask.LinkFunc.SharedMir) void {
// We would like to assert that `mir` is not pending, but that would race with a worker thread
// potentially freeing it.
{
q.mutex.lock();
defer q.mutex.unlock();
switch (q.state) {
.finished, .running => return,
.wait_for_mir => |wait_for| if (wait_for != func_index) return,
}
// We were waiting for `mir`, so we will restart the linker thread.
q.state = .running;
}
assert(mir.status.load(.acquire) != .pending);
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
}
/// Enqueues all prelink tasks in `tasks`. Asserts that they were expected, i.e. that `tasks.len` is
/// less than or equal to `q.pending_prelink_tasks`. Also asserts that `tasks.len` is not 0.
pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) Allocator.Error!void {
{
q.mutex.lock();
defer q.mutex.unlock();
try q.queued_prelink.appendSlice(comp.gpa, tasks);
q.pending_prelink_tasks -= @intCast(tasks.len);
switch (q.state) {
.wait_for_mir => unreachable, // we've not started zcu tasks yet
.running => return,
.finished => {},
}
// Restart the linker thread, because it was waiting for a task
q.state = .running;
}
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
}
pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!void {
assert(comp.separateCodegenThreadOk());
{
q.mutex.lock();
defer q.mutex.unlock();
// If this is a `link_func` task, we might need to wait for `air_bytes_in_flight` to fall.
if (task == .link_func) {
const max_in_flight = max_air_bytes_in_flight -| task.link_func.air_bytes;
while (q.air_bytes_in_flight > max_in_flight) {
q.air_bytes_waiting = task.link_func.air_bytes;
q.air_bytes_cond.wait(&q.mutex);
q.air_bytes_waiting = 0;
}
q.air_bytes_in_flight += task.link_func.air_bytes;
}
try q.queued_zcu.append(comp.gpa, task);
switch (q.state) {
.running, .wait_for_mir => return,
.finished => if (q.pending_prelink_tasks != 0) return,
}
// Restart the linker thread, unless it would immediately be blocked
if (task == .link_func and task.link_func.mir.status.load(.acquire) == .pending) {
q.state = .{ .wait_for_mir = task.link_func.func };
return;
}
q.state = .running;
}
comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
}
fn flushTaskQueue(tid: usize, q: *Queue, comp: *Compilation) void {
q.flush_safety.lock(); // every `return` site should unlock this before unlocking `q.mutex`
if (std.debug.runtime_safety) {
q.mutex.lock();
defer q.mutex.unlock();
assert(q.state == .running);
}
prelink: while (true) {
assert(q.wip_prelink.items.len == 0);
{
q.mutex.lock();
defer q.mutex.unlock();
std.mem.swap(std.ArrayListUnmanaged(PrelinkTask), &q.queued_prelink, &q.wip_prelink);
if (q.wip_prelink.items.len == 0) {
if (q.pending_prelink_tasks == 0) {
break :prelink; // prelink is done
} else {
// We're expecting more prelink tasks so can't move on to ZCU tasks.
q.state = .finished;
q.flush_safety.unlock();
return;
}
}
}
for (q.wip_prelink.items) |task| {
link.doPrelinkTask(comp, task);
}
q.wip_prelink.clearRetainingCapacity();
}
// We've finished the prelink tasks, so run prelink if necessary.
if (comp.bin_file) |lf| {
if (!lf.post_prelink) {
if (lf.prelink()) |_| {
lf.post_prelink = true;
} else |err| switch (err) {
error.OutOfMemory => comp.link_diags.setAllocFailure(),
error.LinkFailure => {},
}
}
}
// Now we can run ZCU tasks.
while (true) {
if (q.wip_zcu.items.len == q.wip_zcu_idx) {
q.wip_zcu.clearRetainingCapacity();
q.wip_zcu_idx = 0;
q.mutex.lock();
defer q.mutex.unlock();
std.mem.swap(std.ArrayListUnmanaged(ZcuTask), &q.queued_zcu, &q.wip_zcu);
if (q.wip_zcu.items.len == 0) {
// We've exhausted all available tasks.
q.state = .finished;
q.flush_safety.unlock();
return;
}
}
const task = q.wip_zcu.items[q.wip_zcu_idx];
// If the task is a `link_func`, we might have to stop until its MIR is populated.
pending: {
if (task != .link_func) break :pending;
const status_ptr = &task.link_func.mir.status;
// First check without the mutex to optimize for the common case where MIR is ready.
if (status_ptr.load(.acquire) != .pending) break :pending;
q.mutex.lock();
defer q.mutex.unlock();
if (status_ptr.load(.acquire) != .pending) break :pending;
// We will stop for now, and get restarted once this MIR is ready.
q.state = .{ .wait_for_mir = task.link_func.func };
q.flush_safety.unlock();
return;
}
link.doZcuTask(comp, tid, task);
task.deinit(comp.zcu.?);
if (task == .link_func) {
// Decrease `air_bytes_in_flight`, since we've finished processing this MIR.
q.mutex.lock();
defer q.mutex.unlock();
q.air_bytes_in_flight -= task.link_func.air_bytes;
if (q.air_bytes_waiting != 0 and
q.air_bytes_in_flight <= max_air_bytes_in_flight -| q.air_bytes_waiting)
{
q.air_bytes_cond.signal();
}
}
q.wip_zcu_idx += 1;
}
}
const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Compilation = @import("../Compilation.zig");
const InternPool = @import("../InternPool.zig");
const link = @import("../link.zig");
const PrelinkTask = link.PrelinkTask;
const ZcuTask = link.ZcuTask;
const Queue = @This();
|