aboutsummaryrefslogtreecommitdiff
path: root/src/ThreadPool.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2020-12-23 16:57:18 -0800
committerAndrew Kelley <andrew@ziglang.org>2020-12-23 16:57:18 -0800
commit177377b6e356b34bbed40cadca596658d158af6b (patch)
treeed7e0a7fa146b8c15044e21f386ec8a8e2977695 /src/ThreadPool.zig
parent5377b7fb97311448daa3c29a8c8f100656d871ba (diff)
downloadzig-177377b6e356b34bbed40cadca596658d158af6b.tar.gz
zig-177377b6e356b34bbed40cadca596658d158af6b.zip
rework std.ResetEvent, improve std lib Darwin integration
* split std.ResetEvent into: - ResetEvent - requires init() at runtime and it can fail. Also requires deinit(). - StaticResetEvent - can be statically initialized and requires no deinitialization. Initialization cannot fail. * the POSIX sem_t implementation can in fact fail on initialization because it is allowed to be implemented as a file descriptor. * Completely define, clarify, and explain in detail the semantics of these APIs. Remove the `isSet` function. * `ResetEvent.timedWait` returns an enum instead of a possible error. * `ResetEvent.init` takes a pointer to the ResetEvent instead of returning a copy. * On Darwin, `ResetEvent` is implemented using Grand Central Dispatch, which is exposed by libSystem. stage2 changes: * ThreadPool: use a single, pre-initialized `ResetEvent` per worker. * WaitGroup: now requires init() and deinit() and init() can fail. - Add a `reset` function. - Compilation initializes one for the work queue in creation and re-uses it for every update. - Rename `stop` to `finish`. - Simplify the implementation based on the usage pattern.
Diffstat (limited to 'src/ThreadPool.zig')
-rw-r--r--src/ThreadPool.zig96
1 files changed, 57 insertions, 39 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig
index cf9c02fa59..1e91d3f731 100644
--- a/src/ThreadPool.zig
+++ b/src/ThreadPool.zig
@@ -9,8 +9,7 @@ const ThreadPool = @This();
lock: std.Mutex = .{},
is_running: bool = true,
allocator: *std.mem.Allocator,
-spawned: usize = 0,
-threads: []*std.Thread,
+workers: []Worker,
run_queue: RunQueue = .{},
idle_queue: IdleQueue = .{},
@@ -20,23 +19,69 @@ const Runnable = struct {
runFn: fn (*Runnable) void,
};
+const Worker = struct {
+ pool: *ThreadPool,
+ thread: *std.Thread,
+ /// The node is for this worker only and must have an already initialized event
+ /// when the thread is spawned.
+ idle_node: IdleQueue.Node,
+
+ fn run(worker: *Worker) void {
+ while (true) {
+ const held = worker.pool.lock.acquire();
+
+ if (worker.pool.run_queue.popFirst()) |run_node| {
+ held.release();
+ (run_node.data.runFn)(&run_node.data);
+ continue;
+ }
+
+ if (worker.pool.is_running) {
+ worker.idle_node.data.reset();
+
+ worker.pool.idle_queue.prepend(&worker.idle_node);
+ held.release();
+
+ worker.idle_node.data.wait();
+ continue;
+ }
+
+ held.release();
+ return;
+ }
+ }
+};
+
pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
self.* = .{
.allocator = allocator,
- .threads = &[_]*std.Thread{},
+ .workers = &[_]Worker{},
};
if (std.builtin.single_threaded)
return;
- errdefer self.deinit();
+ const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1);
+ self.workers = try allocator.alloc(Worker, worker_count);
+ errdefer allocator.free(self.workers);
+
+ var worker_index: usize = 0;
+ errdefer self.destroyWorkers(worker_index);
+ while (worker_index < worker_count) : (worker_index += 1) {
+ const worker = &self.workers[worker_index];
+ worker.pool = self;
- var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1);
- self.threads = try allocator.alloc(*std.Thread, num_threads);
+ // Each worker requires its ResetEvent to be pre-initialized.
+ try worker.idle_node.data.init();
+ errdefer worker.idle_node.data.deinit();
+
+ worker.thread = try std.Thread.spawn(worker, Worker.run);
+ }
+}
- while (num_threads > 0) : (num_threads -= 1) {
- const thread = try std.Thread.spawn(self, runWorker);
- self.threads[self.spawned] = thread;
- self.spawned += 1;
+fn destroyWorkers(self: *ThreadPool, spawned: usize) void {
+ for (self.workers[0..spawned]) |*worker| {
+ worker.thread.wait();
+ worker.idle_node.data.deinit();
}
}
@@ -50,9 +95,8 @@ pub fn deinit(self: *ThreadPool) void {
idle_node.data.set();
}
- defer self.allocator.free(self.threads);
- for (self.threads[0..self.spawned]) |thread|
- thread.wait();
+ self.destroyWorkers(self.workers.len);
+ self.allocator.free(self.workers);
}
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
@@ -92,29 +136,3 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
if (self.idle_queue.popFirst()) |idle_node|
idle_node.data.set();
}
-
-fn runWorker(self: *ThreadPool) void {
- while (true) {
- const held = self.lock.acquire();
-
- if (self.run_queue.popFirst()) |run_node| {
- held.release();
- (run_node.data.runFn)(&run_node.data);
- continue;
- }
-
- if (self.is_running) {
- var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() };
-
- self.idle_queue.prepend(&idle_node);
- held.release();
-
- idle_node.data.wait();
- idle_node.data.deinit();
- continue;
- }
-
- held.release();
- return;
- }
-}