diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2020-12-20 21:19:05 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-12-20 21:19:05 -0500 |
| commit | 4918605176c2e48c178847ea281b790334207733 (patch) | |
| tree | 4adb4cbf5e6764df65747fb3bae2c1640060eb89 /src/ThreadPool.zig | |
| parent | 4964bb3282bf13de03a79fad1fb9bca104dc1930 (diff) | |
| parent | 1d94a6893689ad1fb8e06308ae51603a6c8708a8 (diff) | |
| download | zig-4918605176c2e48c178847ea281b790334207733.tar.gz zig-4918605176c2e48c178847ea281b790334207733.zip | |
Merge pull request #7462 from ziglang/parallel-c-objects
Introduce a ThreadPool and parallel execution of some of the compilation work items
Diffstat (limited to 'src/ThreadPool.zig')
| -rw-r--r-- | src/ThreadPool.zig | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig new file mode 100644 index 0000000000..00cb26772a --- /dev/null +++ b/src/ThreadPool.zig @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. +const std = @import("std"); +const ThreadPool = @This(); + +lock: std.Mutex = .{}, +is_running: bool = true, +allocator: *std.mem.Allocator, +running: usize = 0, +threads: []*std.Thread, +run_queue: RunQueue = .{}, +idle_queue: IdleQueue = .{}, + +const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent); +const RunQueue = std.SinglyLinkedList(Runnable); +const Runnable = struct { + runFn: fn (*Runnable) void, +}; + +pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { + self.* = .{ + .allocator = allocator, + .threads = &[_]*std.Thread{}, + }; + if (std.builtin.single_threaded) + return; + + errdefer self.deinit(); + + var num_threads = std.Thread.cpuCount() catch 1; + if (num_threads > 0) + self.threads = try allocator.alloc(*std.Thread, num_threads); + + while (num_threads > 0) : (num_threads -= 1) { + const thread = try std.Thread.spawn(self, runWorker); + self.threads[self.running] = thread; + self.running += 1; + } +} + +pub fn deinit(self: *ThreadPool) void { + self.shutdown(); + + std.debug.assert(!self.is_running); + for (self.threads[0..self.running]) |thread| + thread.wait(); + + defer self.threads = &[_]*std.Thread{}; + if (self.running > 0) + self.allocator.free(self.threads); +} + +pub fn shutdown(self: *ThreadPool) void { + const held = self.lock.acquire(); + + if (!self.is_running) + return held.release(); + + var idle_queue = self.idle_queue; + self.idle_queue = .{}; + self.is_running = false; + held.release(); + + while (idle_queue.popFirst()) |idle_node| + idle_node.data.set(); +} + +pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { + if (std.builtin.single_threaded) { + @call(.{}, func, args); + return; + } + const Args = @TypeOf(args); + const Closure = struct { + arguments: Args, + pool: *ThreadPool, + run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + + fn runFn(runnable: *Runnable) void { + const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); + const closure = @fieldParentPtr(@This(), "run_node", run_node); + const result = @call(.{}, func, closure.arguments); + closure.pool.allocator.destroy(closure); + } + }; + + const closure = try self.allocator.create(Closure); + closure.* = .{ + .arguments = args, + .pool = self, + }; + + const held = self.lock.acquire(); + self.run_queue.prepend(&closure.run_node); + + const idle_node = self.idle_queue.popFirst(); + held.release(); + + if (idle_node) |node| + node.data.set(); +} + +fn runWorker(self: *ThreadPool) void { + while (true) { + const held = self.lock.acquire(); + + if (self.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (!self.is_running) { + held.release(); + return; + } + + var idle_node = IdleQueue.Node{ .data = .{} }; + self.idle_queue.prepend(&idle_node); + held.release(); + idle_node.data.wait(); + } +} |
