aboutsummaryrefslogtreecommitdiff
path: root/src/ThreadPool.zig
diff options
context:
space:
mode:
authorprotty <45520026+kprotty@users.noreply.github.com>2022-04-26 16:48:56 -0500
committerGitHub <noreply@github.com>2022-04-26 16:48:56 -0500
commit18f30346291bd2471e07924af161de080935dd60 (patch)
tree609cdd73aa40f15625f896e79b9420b3e320ddcd /src/ThreadPool.zig
parent50f1856476038e57f5d2f47c751f608b0b360662 (diff)
downloadzig-18f30346291bd2471e07924af161de080935dd60.tar.gz
zig-18f30346291bd2471e07924af161de080935dd60.zip
std.Thread: ResetEvent improvements (#11523)
* std: start removing redundant ResetEvents * src: fix other uses of std.Thread.ResetEvent * src: add builtin.sanitize_thread for tsan detection * atomic: add Atomic.fence for proper fencing with tsan * Thread: remove the other ResetEvent's and rewrite the current one * Thread: ResetEvent docs * zig fmt + WaitGroup.reset() fix * src: fix build issues for ResetEvent + tsan * Thread: ResetEvent tests * Thread: ResetEvent module doc * Atomic: replace llvm *p memory constraint with *m * panicking: handle spurious wakeups in futex.wait() when waiting for abort() * zig fmt
Diffstat (limited to 'src/ThreadPool.zig')
-rw-r--r--src/ThreadPool.zig139
1 files changed, 64 insertions, 75 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig
index 36d004cfc6..7d1c8420af 100644
--- a/src/ThreadPool.zig
+++ b/src/ThreadPool.zig
@@ -3,13 +3,12 @@ const builtin = @import("builtin");
const ThreadPool = @This();
mutex: std.Thread.Mutex = .{},
+cond: std.Thread.Condition = .{},
+run_queue: RunQueue = .{},
is_running: bool = true,
allocator: std.mem.Allocator,
-workers: []Worker,
-run_queue: RunQueue = .{},
-idle_queue: IdleQueue = .{},
+threads: []std.Thread,
-const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent);
const RunQueue = std.SinglyLinkedList(Runnable);
const Runnable = struct {
runFn: RunProto,
@@ -20,89 +19,52 @@ const RunProto = switch (builtin.zig_backend) {
else => *const fn (*Runnable) void,
};
-const Worker = struct {
- pool: *ThreadPool,
- thread: std.Thread,
- /// The node is for this worker only and must have an already initialized event
- /// when the thread is spawned.
- idle_node: IdleQueue.Node,
-
- fn run(worker: *Worker) void {
- const pool = worker.pool;
-
- while (true) {
- pool.mutex.lock();
-
- if (pool.run_queue.popFirst()) |run_node| {
- pool.mutex.unlock();
- (run_node.data.runFn)(&run_node.data);
- continue;
- }
-
- if (pool.is_running) {
- worker.idle_node.data.reset();
-
- pool.idle_queue.prepend(&worker.idle_node);
- pool.mutex.unlock();
-
- worker.idle_node.data.wait();
- continue;
- }
-
- pool.mutex.unlock();
- return;
- }
- }
-};
-
pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void {
self.* = .{
.allocator = allocator,
- .workers = &[_]Worker{},
+ .threads = &[_]std.Thread{},
};
- if (builtin.single_threaded)
- return;
- const worker_count = std.math.max(1, std.Thread.getCpuCount() catch 1);
- self.workers = try allocator.alloc(Worker, worker_count);
- errdefer allocator.free(self.workers);
+ if (builtin.single_threaded) {
+ return;
+ }
- var worker_index: usize = 0;
- errdefer self.destroyWorkers(worker_index);
- while (worker_index < worker_count) : (worker_index += 1) {
- const worker = &self.workers[worker_index];
- worker.pool = self;
+ const thread_count = std.math.max(1, std.Thread.getCpuCount() catch 1);
+ self.threads = try allocator.alloc(std.Thread, thread_count);
+ errdefer allocator.free(self.threads);
- // Each worker requires its ResetEvent to be pre-initialized.
- try worker.idle_node.data.init();
- errdefer worker.idle_node.data.deinit();
+ // kill and join any threads we spawned previously on error.
+ var spawned: usize = 0;
+ errdefer self.join(spawned);
- worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker});
+ for (self.threads) |*thread| {
+ thread.* = try std.Thread.spawn(.{}, worker, .{self});
+ spawned += 1;
}
}
-fn destroyWorkers(self: *ThreadPool, spawned: usize) void {
- if (builtin.single_threaded)
- return;
-
- for (self.workers[0..spawned]) |*worker| {
- worker.thread.join();
- worker.idle_node.data.deinit();
- }
+pub fn deinit(self: *ThreadPool) void {
+ self.join(self.threads.len); // kill and join all threads.
+ self.* = undefined;
}
-pub fn deinit(self: *ThreadPool) void {
+fn join(self: *ThreadPool, spawned: usize) void {
{
self.mutex.lock();
defer self.mutex.unlock();
+ // ensure future worker threads exit the dequeue loop
self.is_running = false;
- while (self.idle_queue.popFirst()) |idle_node|
- idle_node.data.set();
}
- self.destroyWorkers(self.workers.len);
- self.allocator.free(self.workers);
+ // wake up any sleeping threads (this can be done outside the mutex)
+ // then wait for all the threads we know are spawned to complete.
+ self.cond.broadcast();
+ for (self.threads[0..spawned]) |thread| {
+ thread.join();
+ }
+
+ self.allocator.free(self.threads);
}
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
@@ -122,24 +84,51 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
const closure = @fieldParentPtr(@This(), "run_node", run_node);
@call(.{}, func, closure.arguments);
+ // The thread pool's allocator is protected by the mutex.
const mutex = &closure.pool.mutex;
mutex.lock();
defer mutex.unlock();
+
closure.pool.allocator.destroy(closure);
}
};
+ {
+ self.mutex.lock();
+ defer self.mutex.unlock();
+
+ const closure = try self.allocator.create(Closure);
+ closure.* = .{
+ .arguments = args,
+ .pool = self,
+ };
+
+ self.run_queue.prepend(&closure.run_node);
+ }
+
+ // Notify waiting threads outside the lock to try and keep the critical section small.
+ self.cond.signal();
+}
+
+fn worker(self: *ThreadPool) void {
self.mutex.lock();
defer self.mutex.unlock();
- const closure = try self.allocator.create(Closure);
- closure.* = .{
- .arguments = args,
- .pool = self,
- };
+ while (true) {
+ while (self.run_queue.popFirst()) |run_node| {
+ // Temporarily unlock the mutex in order to execute the run_node
+ self.mutex.unlock();
+ defer self.mutex.lock();
- self.run_queue.prepend(&closure.run_node);
+ const runFn = run_node.data.runFn;
+ runFn(&run_node.data);
+ }
- if (self.idle_queue.popFirst()) |idle_node|
- idle_node.data.set();
+ // Stop executing instead of waiting if the thread pool is no longer running.
+ if (self.is_running) {
+ self.cond.wait(&self.mutex);
+ } else {
+ break;
+ }
+ }
}