diff options
| author | protty <45520026+kprotty@users.noreply.github.com> | 2022-04-26 16:48:56 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-04-26 16:48:56 -0500 |
| commit | 18f30346291bd2471e07924af161de080935dd60 (patch) | |
| tree | 609cdd73aa40f15625f896e79b9420b3e320ddcd /src/ThreadPool.zig | |
| parent | 50f1856476038e57f5d2f47c751f608b0b360662 (diff) | |
| download | zig-18f30346291bd2471e07924af161de080935dd60.tar.gz zig-18f30346291bd2471e07924af161de080935dd60.zip | |
std.Thread: ResetEvent improvements (#11523)
* std: start removing redundant ResetEvents
* src: fix other uses of std.Thread.ResetEvent
* src: add builtin.sanitize_thread for tsan detection
* atomic: add Atomic.fence for proper fencing with tsan
* Thread: remove the other ResetEvent's and rewrite the current one
* Thread: ResetEvent docs
* zig fmt + WaitGroup.reset() fix
* src: fix build issues for ResetEvent + tsan
* Thread: ResetEvent tests
* Thread: ResetEvent module doc
* Atomic: replace llvm *p memory constraint with *m
* panicking: handle spurious wakeups in futex.wait() when waiting for abort()
* zig fmt
Diffstat (limited to 'src/ThreadPool.zig')
| -rw-r--r-- | src/ThreadPool.zig | 139 |
1 files changed, 64 insertions, 75 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 36d004cfc6..7d1c8420af 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -3,13 +3,12 @@ const builtin = @import("builtin"); const ThreadPool = @This(); mutex: std.Thread.Mutex = .{}, +cond: std.Thread.Condition = .{}, +run_queue: RunQueue = .{}, is_running: bool = true, allocator: std.mem.Allocator, -workers: []Worker, -run_queue: RunQueue = .{}, -idle_queue: IdleQueue = .{}, +threads: []std.Thread, -const IdleQueue = std.SinglyLinkedList(std.Thread.ResetEvent); const RunQueue = std.SinglyLinkedList(Runnable); const Runnable = struct { runFn: RunProto, @@ -20,89 +19,52 @@ const RunProto = switch (builtin.zig_backend) { else => *const fn (*Runnable) void, }; -const Worker = struct { - pool: *ThreadPool, - thread: std.Thread, - /// The node is for this worker only and must have an already initialized event - /// when the thread is spawned. - idle_node: IdleQueue.Node, - - fn run(worker: *Worker) void { - const pool = worker.pool; - - while (true) { - pool.mutex.lock(); - - if (pool.run_queue.popFirst()) |run_node| { - pool.mutex.unlock(); - (run_node.data.runFn)(&run_node.data); - continue; - } - - if (pool.is_running) { - worker.idle_node.data.reset(); - - pool.idle_queue.prepend(&worker.idle_node); - pool.mutex.unlock(); - - worker.idle_node.data.wait(); - continue; - } - - pool.mutex.unlock(); - return; - } - } -}; - pub fn init(self: *ThreadPool, allocator: std.mem.Allocator) !void { self.* = .{ .allocator = allocator, - .workers = &[_]Worker{}, + .threads = &[_]std.Thread{}, }; - if (builtin.single_threaded) - return; - const worker_count = std.math.max(1, std.Thread.getCpuCount() catch 1); - self.workers = try allocator.alloc(Worker, worker_count); - errdefer allocator.free(self.workers); + if (builtin.single_threaded) { + return; + } - var worker_index: usize = 0; - errdefer self.destroyWorkers(worker_index); - while (worker_index < worker_count) : (worker_index += 1) { - const worker = &self.workers[worker_index]; - worker.pool = self; + const thread_count = std.math.max(1, std.Thread.getCpuCount() catch 1); + self.threads = try allocator.alloc(std.Thread, thread_count); + errdefer allocator.free(self.threads); - // Each worker requires its ResetEvent to be pre-initialized. - try worker.idle_node.data.init(); - errdefer worker.idle_node.data.deinit(); + // kill and join any threads we spawned previously on error. + var spawned: usize = 0; + errdefer self.join(spawned); - worker.thread = try std.Thread.spawn(.{}, Worker.run, .{worker}); + for (self.threads) |*thread| { + thread.* = try std.Thread.spawn(.{}, worker, .{self}); + spawned += 1; } } -fn destroyWorkers(self: *ThreadPool, spawned: usize) void { - if (builtin.single_threaded) - return; - - for (self.workers[0..spawned]) |*worker| { - worker.thread.join(); - worker.idle_node.data.deinit(); - } +pub fn deinit(self: *ThreadPool) void { + self.join(self.threads.len); // kill and join all threads. + self.* = undefined; } -pub fn deinit(self: *ThreadPool) void { +fn join(self: *ThreadPool, spawned: usize) void { { self.mutex.lock(); defer self.mutex.unlock(); + // ensure future worker threads exit the dequeue loop self.is_running = false; - while (self.idle_queue.popFirst()) |idle_node| - idle_node.data.set(); } - self.destroyWorkers(self.workers.len); - self.allocator.free(self.workers); + // wake up any sleeping threads (this can be done outside the mutex) + // then wait for all the threads we know are spawned to complete. + self.cond.broadcast(); + for (self.threads[0..spawned]) |thread| { + thread.join(); + } + + self.allocator.free(self.threads); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { @@ -122,24 +84,51 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { const closure = @fieldParentPtr(@This(), "run_node", run_node); @call(.{}, func, closure.arguments); + // The thread pool's allocator is protected by the mutex. const mutex = &closure.pool.mutex; mutex.lock(); defer mutex.unlock(); + closure.pool.allocator.destroy(closure); } }; + { + self.mutex.lock(); + defer self.mutex.unlock(); + + const closure = try self.allocator.create(Closure); + closure.* = .{ + .arguments = args, + .pool = self, + }; + + self.run_queue.prepend(&closure.run_node); + } + + // Notify waiting threads outside the lock to try and keep the critical section small. + self.cond.signal(); +} + +fn worker(self: *ThreadPool) void { self.mutex.lock(); defer self.mutex.unlock(); - const closure = try self.allocator.create(Closure); - closure.* = .{ - .arguments = args, - .pool = self, - }; + while (true) { + while (self.run_queue.popFirst()) |run_node| { + // Temporarily unlock the mutex in order to execute the run_node + self.mutex.unlock(); + defer self.mutex.lock(); - self.run_queue.prepend(&closure.run_node); + const runFn = run_node.data.runFn; + runFn(&run_node.data); + } - if (self.idle_queue.popFirst()) |idle_node| - idle_node.data.set(); + // Stop executing instead of waiting if the thread pool is no longer running. + if (self.is_running) { + self.cond.wait(&self.mutex); + } else { + break; + } + } } |
