diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2020-12-15 18:30:29 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2020-12-20 15:08:59 -0700 |
| commit | 0d1cd0d4822628c104890af4c31cdf38c6f96d35 (patch) | |
| tree | f61949f7abc888284fcd33c4696fcb71701f5e16 /src | |
| parent | 01d33855c736b04160d9616f138442aa4e41a738 (diff) | |
| download | zig-0d1cd0d4822628c104890af4c31cdf38c6f96d35.tar.gz zig-0d1cd0d4822628c104890af4c31cdf38c6f96d35.zip | |
use kprotty's ThreadPool implementation (v5)
Diffstat (limited to 'src')
| -rw-r--r-- | src/Compilation.zig | 62 | ||||
| -rw-r--r-- | src/ThreadPool.zig | 116 | ||||
| -rw-r--r-- | src/WaitGroup.zig | 22 | ||||
| -rw-r--r-- | src/glibc.zig | 1 | ||||
| -rw-r--r-- | src/libcxx.zig | 2 | ||||
| -rw-r--r-- | src/libunwind.zig | 1 | ||||
| -rw-r--r-- | src/main.zig | 10 | ||||
| -rw-r--r-- | src/musl.zig | 1 | ||||
| -rw-r--r-- | src/test.zig | 15 |
9 files changed, 212 insertions, 18 deletions
diff --git a/src/Compilation.zig b/src/Compilation.zig index a7fd75fe56..ef07de7b17 100644 --- a/src/Compilation.zig +++ b/src/Compilation.zig @@ -26,6 +26,8 @@ const Module = @import("Module.zig"); const Cache = @import("Cache.zig"); const stage1 = @import("stage1.zig"); const translate_c = @import("translate_c.zig"); +const ThreadPool = @import("ThreadPool.zig"); +const WaitGroup = @import("WaitGroup.zig"); /// General-purpose allocator. Used for both temporary and long-term storage. gpa: *Allocator, @@ -79,6 +81,7 @@ zig_lib_directory: Directory, local_cache_directory: Directory, global_cache_directory: Directory, libc_include_dir_list: []const []const u8, +thread_pool: *ThreadPool, /// Populated when we build the libc++ static library. A Job to build this is placed in the queue /// and resolved before calling linker.flush(). @@ -335,6 +338,7 @@ pub const InitOptions = struct { root_name: []const u8, root_pkg: ?*Package, output_mode: std.builtin.OutputMode, + thread_pool: *ThreadPool, dynamic_linker: ?[]const u8 = null, /// `null` means to not emit a binary file. emit_bin: ?EmitLoc, @@ -985,6 +989,7 @@ pub fn create(gpa: *Allocator, options: InitOptions) !*Compilation { .self_exe_path = options.self_exe_path, .libc_include_dir_list = libc_dirs.libc_include_dir_list, .sanitize_c = sanitize_c, + .thread_pool = options.thread_pool, .clang_passthrough_mode = options.clang_passthrough_mode, .clang_preprocessor_mode = options.clang_preprocessor_mode, .verbose_cc = options.verbose_cc, @@ -1380,24 +1385,14 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor var c_comp_progress_node = main_progress_node.start("Compile C Objects", self.c_source_files.len); defer c_comp_progress_node.end(); + var wg = WaitGroup{}; + defer wg.wait(); + while (self.c_object_work_queue.readItem()) |c_object| { - self.updateCObject(c_object, &c_comp_progress_node) catch |err| switch (err) { - error.AnalysisFail => continue, - else => { - { - var lock = self.mutex.acquire(); - defer lock.release(); - try self.failed_c_objects.ensureCapacity(self.gpa, self.failed_c_objects.items().len + 1); - self.failed_c_objects.putAssumeCapacityNoClobber(c_object, try ErrorMsg.create( - self.gpa, - 0, - "unable to build C object: {s}", - .{@errorName(err)}, - )); - } - c_object.status = .{ .failure = {} }; - }, - }; + wg.start(); + try self.thread_pool.spawn(workerUpdateCObject, .{ + self, c_object, &c_comp_progress_node, &wg, + }); } while (self.work_queue.readItem()) |work_item| switch (work_item) { @@ -1721,6 +1716,37 @@ pub fn cImport(comp: *Compilation, c_src: []const u8) !CImportResult { }; } +fn workerUpdateCObject( + comp: *Compilation, + c_object: *CObject, + progress_node: *std.Progress.Node, + wg: *WaitGroup, +) void { + defer wg.stop(); + + comp.updateCObject(c_object, progress_node) catch |err| switch (err) { + error.AnalysisFail => return, + else => { + { + var lock = comp.mutex.acquire(); + defer lock.release(); + comp.failed_c_objects.ensureCapacity(comp.gpa, comp.failed_c_objects.items().len + 1) catch { + fatal("TODO handle this by setting c_object.status = oom failure", .{}); + }; + comp.failed_c_objects.putAssumeCapacityNoClobber(c_object, ErrorMsg.create( + comp.gpa, + 0, + "unable to build C object: {s}", + .{@errorName(err)}, + ) catch { + fatal("TODO handle this by setting c_object.status = oom failure", .{}); + }); + } + c_object.status = .{ .failure = {} }; + }, + }; +} + fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: *std.Progress.Node) !void { if (!build_options.have_llvm) { return comp.failCObj(c_object, "clang not available: compiler built without LLVM extensions", .{}); @@ -2800,6 +2826,7 @@ fn buildOutputFromZig( .root_name = root_name, .root_pkg = &root_pkg, .output_mode = fixed_output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = optimize_mode, @@ -3173,6 +3200,7 @@ pub fn build_crt_file( .root_name = root_name, .root_pkg = null, .output_mode = output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig new file mode 100644 index 0000000000..a5f59a30e7 --- /dev/null +++ b/src/ThreadPool.zig @@ -0,0 +1,116 @@ +const std = @import("std"); +const ThreadPool = @This(); + +lock: std.Mutex = .{}, +is_running: bool = true, +allocator: *std.mem.Allocator, +running: usize = 0, +threads: []*std.Thread, +run_queue: RunQueue = .{}, +idle_queue: IdleQueue = .{}, + +const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent); +const RunQueue = std.SinglyLinkedList(Runnable); +const Runnable = struct { + runFn: fn (*Runnable) void, +}; + +pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { + self.* = .{ + .allocator = allocator, + .threads = &[_]*std.Thread{}, + }; + + errdefer self.deinit(); + + var num_threads = std.Thread.cpuCount() catch 1; + if (num_threads > 0) + self.threads = try allocator.alloc(*std.Thread, num_threads); + + while (num_threads > 0) : (num_threads -= 1) { + const thread = try std.Thread.spawn(self, runWorker); + self.threads[self.running] = thread; + self.running += 1; + } +} + +pub fn deinit(self: *ThreadPool) void { + self.shutdown(); + + std.debug.assert(!self.is_running); + for (self.threads[0..self.running]) |thread| + thread.wait(); + + defer self.threads = &[_]*std.Thread{}; + if (self.running > 0) + self.allocator.free(self.threads); +} + +pub fn shutdown(self: *ThreadPool) void { + const held = self.lock.acquire(); + + if (!self.is_running) + return held.release(); + + var idle_queue = self.idle_queue; + self.idle_queue = .{}; + self.is_running = false; + held.release(); + + while (idle_queue.popFirst()) |idle_node| + idle_node.data.set(); +} + +pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { + const Args = @TypeOf(args); + const Closure = struct { + arguments: Args, + pool: *ThreadPool, + run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + + fn runFn(runnable: *Runnable) void { + const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); + const closure = @fieldParentPtr(@This(), "run_node", run_node); + const result = @call(.{}, func, closure.arguments); + closure.pool.allocator.destroy(closure); + } + }; + + const closure = try self.allocator.create(Closure); + errdefer self.allocator.destroy(closure); + closure.* = .{ + .arguments = args, + .pool = self, + }; + + const held = self.lock.acquire(); + self.run_queue.prepend(&closure.run_node); + + const idle_node = self.idle_queue.popFirst(); + held.release(); + + if (idle_node) |node| + node.data.set(); +} + +fn runWorker(self: *ThreadPool) void { + while (true) { + const held = self.lock.acquire(); + + if (self.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (!self.is_running) { + held.release(); + return; + } + + var idle_node = IdleQueue.Node{ .data = .{} }; + self.idle_queue.prepend(&idle_node); + held.release(); + idle_node.data.wait(); + } +} diff --git a/src/WaitGroup.zig b/src/WaitGroup.zig new file mode 100644 index 0000000000..295dfd39dc --- /dev/null +++ b/src/WaitGroup.zig @@ -0,0 +1,22 @@ +const std = @import("std"); +const WaitGroup = @This(); + +counter: usize = 0, +event: ?*std.AutoResetEvent = null, + +pub fn start(self: *WaitGroup) void { + _ = @atomicRmw(usize, &self.counter, .Add, 1, .SeqCst); +} + +pub fn stop(self: *WaitGroup) void { + if (@atomicRmw(usize, &self.counter, .Sub, 1, .SeqCst) == 1) + if (@atomicRmw(?*std.AutoResetEvent, &self.event, .Xchg, null, .SeqCst)) |event| + event.set(); +} + +pub fn wait(self: *WaitGroup) void { + var event = std.AutoResetEvent{}; + @atomicStore(?*std.AutoResetEvent, &self.event, &event, .SeqCst); + if (@atomicLoad(usize, &self.counter, .SeqCst) != 0) + event.wait(); +} diff --git a/src/glibc.zig b/src/glibc.zig index 15c9d743f9..413e38637b 100644 --- a/src/glibc.zig +++ b/src/glibc.zig @@ -936,6 +936,7 @@ fn buildSharedLib( .root_pkg = null, .output_mode = .Lib, .link_mode = .Dynamic, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/libcxx.zig b/src/libcxx.zig index 8de45a49b2..142d014b2f 100644 --- a/src/libcxx.zig +++ b/src/libcxx.zig @@ -162,6 +162,7 @@ pub fn buildLibCXX(comp: *Compilation) !void { .root_name = root_name, .root_pkg = null, .output_mode = output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, @@ -280,6 +281,7 @@ pub fn buildLibCXXABI(comp: *Compilation) !void { .root_name = root_name, .root_pkg = null, .output_mode = output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/libunwind.zig b/src/libunwind.zig index 9822016ae1..13a4fdf7c7 100644 --- a/src/libunwind.zig +++ b/src/libunwind.zig @@ -95,6 +95,7 @@ pub fn buildStaticLib(comp: *Compilation) !void { .root_name = root_name, .root_pkg = null, .output_mode = output_mode, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = emit_bin, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/main.zig b/src/main.zig index c54cc6515b..c25bc20d85 100644 --- a/src/main.zig +++ b/src/main.zig @@ -19,6 +19,7 @@ const LibCInstallation = @import("libc_installation.zig").LibCInstallation; const translate_c = @import("translate_c.zig"); const Cache = @import("Cache.zig"); const target_util = @import("target.zig"); +const ThreadPool = @import("ThreadPool.zig"); pub fn fatal(comptime format: []const u8, args: anytype) noreturn { std.log.emerg(format, args); @@ -1632,6 +1633,10 @@ fn buildOutputType( }; defer zig_lib_directory.handle.close(); + var thread_pool: ThreadPool = undefined; + try thread_pool.init(gpa); + defer thread_pool.deinit(); + var libc_installation: ?LibCInstallation = null; defer if (libc_installation) |*l| l.deinit(gpa); @@ -1747,6 +1752,7 @@ fn buildOutputType( .single_threaded = single_threaded, .function_sections = function_sections, .self_exe_path = self_exe_path, + .thread_pool = &thread_pool, .clang_passthrough_mode = arg_mode != .build, .clang_preprocessor_mode = clang_preprocessor_mode, .version = optional_version, @@ -2412,6 +2418,9 @@ pub fn cmdBuild(gpa: *Allocator, arena: *Allocator, args: []const []const u8) !v .directory = null, // Use the local zig-cache. .basename = exe_basename, }; + var thread_pool: ThreadPool = undefined; + try thread_pool.init(gpa); + defer thread_pool.deinit(); const comp = Compilation.create(gpa, .{ .zig_lib_directory = zig_lib_directory, .local_cache_directory = local_cache_directory, @@ -2427,6 +2436,7 @@ pub fn cmdBuild(gpa: *Allocator, arena: *Allocator, args: []const []const u8) !v .emit_h = null, .optimize_mode = .Debug, .self_exe_path = self_exe_path, + .thread_pool = &thread_pool, }) catch |err| { fatal("unable to create compilation: {}", .{@errorName(err)}); }; diff --git a/src/musl.zig b/src/musl.zig index 1a30a6e2b9..80943ab1f7 100644 --- a/src/musl.zig +++ b/src/musl.zig @@ -200,6 +200,7 @@ pub fn buildCRTFile(comp: *Compilation, crt_file: CRTFile) !void { .root_pkg = null, .output_mode = .Lib, .link_mode = .Dynamic, + .thread_pool = comp.thread_pool, .libc_installation = comp.bin_file.options.libc_installation, .emit_bin = Compilation.EmitLoc{ .directory = null, .basename = "libc.so" }, .optimize_mode = comp.bin_file.options.optimize_mode, diff --git a/src/test.zig b/src/test.zig index e0497a8cd5..795d4f0d65 100644 --- a/src/test.zig +++ b/src/test.zig @@ -10,6 +10,7 @@ const enable_qemu: bool = build_options.enable_qemu; const enable_wine: bool = build_options.enable_wine; const enable_wasmtime: bool = build_options.enable_wasmtime; const glibc_multi_install_dir: ?[]const u8 = build_options.glibc_multi_install_dir; +const ThreadPool = @import("ThreadPool.zig"); const cheader = @embedFile("link/cbe.h"); @@ -467,6 +468,10 @@ pub const TestContext = struct { defer zig_lib_directory.handle.close(); defer std.testing.allocator.free(zig_lib_directory.path.?); + var thread_pool: ThreadPool = undefined; + try thread_pool.init(std.testing.allocator); + defer thread_pool.deinit(); + for (self.cases.items) |case| { if (build_options.skip_non_native and case.target.getCpuArch() != std.Target.current.cpu.arch) continue; @@ -480,7 +485,13 @@ pub const TestContext = struct { progress.initial_delay_ns = 0; progress.refresh_rate_ns = 0; - try self.runOneCase(std.testing.allocator, &prg_node, case, zig_lib_directory); + try self.runOneCase( + std.testing.allocator, + &prg_node, + case, + zig_lib_directory, + &thread_pool, + ); } } @@ -490,6 +501,7 @@ pub const TestContext = struct { root_node: *std.Progress.Node, case: Case, zig_lib_directory: Compilation.Directory, + thread_pool: *ThreadPool, ) !void { const target_info = try std.zig.system.NativeTargetInfo.detect(allocator, case.target); const target = target_info.target; @@ -539,6 +551,7 @@ pub const TestContext = struct { .local_cache_directory = zig_cache_directory, .global_cache_directory = zig_cache_directory, .zig_lib_directory = zig_lib_directory, + .thread_pool = thread_pool, .root_name = "test_case", .target = target, // TODO: support tests for object file building, and library builds |
