aboutsummaryrefslogtreecommitdiff
path: root/src/ThreadPool.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/ThreadPool.zig')
-rw-r--r--src/ThreadPool.zig138
1 files changed, 138 insertions, 0 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig
new file mode 100644
index 0000000000..ec580210e9
--- /dev/null
+++ b/src/ThreadPool.zig
@@ -0,0 +1,138 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2020 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+const std = @import("std");
+const ThreadPool = @This();
+
+lock: std.Thread.Mutex = .{},
+is_running: bool = true,
+allocator: *std.mem.Allocator,
+workers: []Worker,
+run_queue: RunQueue = .{},
+idle_queue: IdleQueue = .{},
+
+const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent);
+const RunQueue = std.SinglyLinkedList(Runnable);
+const Runnable = struct {
+ runFn: fn (*Runnable) void,
+};
+
+const Worker = struct {
+ pool: *ThreadPool,
+ thread: *std.Thread,
+ /// The node is for this worker only and must have an already initialized event
+ /// when the thread is spawned.
+ idle_node: IdleQueue.Node,
+
+ fn run(worker: *Worker) void {
+ while (true) {
+ const held = worker.pool.lock.acquire();
+
+ if (worker.pool.run_queue.popFirst()) |run_node| {
+ held.release();
+ (run_node.data.runFn)(&run_node.data);
+ continue;
+ }
+
+ if (worker.pool.is_running) {
+ worker.idle_node.data.reset();
+
+ worker.pool.idle_queue.prepend(&worker.idle_node);
+ held.release();
+
+ worker.idle_node.data.wait();
+ continue;
+ }
+
+ held.release();
+ return;
+ }
+ }
+};
+
+pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
+ self.* = .{
+ .allocator = allocator,
+ .workers = &[_]Worker{},
+ };
+ if (std.builtin.single_threaded)
+ return;
+
+ const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1);
+ self.workers = try allocator.alloc(Worker, worker_count);
+ errdefer allocator.free(self.workers);
+
+ var worker_index: usize = 0;
+ errdefer self.destroyWorkers(worker_index);
+ while (worker_index < worker_count) : (worker_index += 1) {
+ const worker = &self.workers[worker_index];
+ worker.pool = self;
+
+ // Each worker requires its ResetEvent to be pre-initialized.
+ try worker.idle_node.data.init();
+ errdefer worker.idle_node.data.deinit();
+
+ worker.thread = try std.Thread.spawn(worker, Worker.run);
+ }
+}
+
+fn destroyWorkers(self: *ThreadPool, spawned: usize) void {
+ for (self.workers[0..spawned]) |*worker| {
+ worker.thread.wait();
+ worker.idle_node.data.deinit();
+ }
+}
+
+pub fn deinit(self: *ThreadPool) void {
+ {
+ const held = self.lock.acquire();
+ defer held.release();
+
+ self.is_running = false;
+ while (self.idle_queue.popFirst()) |idle_node|
+ idle_node.data.set();
+ }
+
+ self.destroyWorkers(self.workers.len);
+ self.allocator.free(self.workers);
+}
+
+pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
+ if (std.builtin.single_threaded) {
+ const result = @call(.{}, func, args);
+ return;
+ }
+
+ const Args = @TypeOf(args);
+ const Closure = struct {
+ arguments: Args,
+ pool: *ThreadPool,
+ run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
+
+ fn runFn(runnable: *Runnable) void {
+ const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable);
+ const closure = @fieldParentPtr(@This(), "run_node", run_node);
+ const result = @call(.{}, func, closure.arguments);
+
+ const held = closure.pool.lock.acquire();
+ defer held.release();
+ closure.pool.allocator.destroy(closure);
+ }
+ };
+
+ const held = self.lock.acquire();
+ defer held.release();
+
+ const closure = try self.allocator.create(Closure);
+ closure.* = .{
+ .arguments = args,
+ .pool = self,
+ };
+
+ self.run_queue.prepend(&closure.run_node);
+
+ if (self.idle_queue.popFirst()) |idle_node|
+ idle_node.data.set();
+}