diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2020-12-21 14:21:51 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2020-12-23 13:36:21 -0800 |
| commit | 829c00a77fd2d6b7576c6d2b724f69ba9cfe10f2 (patch) | |
| tree | 879be7bfbb6b021489f1d45d63af9732dba4593a /src/ThreadPool.zig | |
| parent | 4eb4d26fa14524652bed69325eb491f39701d995 (diff) | |
| download | zig-829c00a77fd2d6b7576c6d2b724f69ba9cfe10f2.tar.gz zig-829c00a77fd2d6b7576c6d2b724f69ba9cfe10f2.zip | |
kprotty ThreadPool and WaitGroup patch
Diffstat (limited to 'src/ThreadPool.zig')
| -rw-r--r-- | src/ThreadPool.zig | 76 |
1 files changed, 35 insertions, 41 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index 00cb26772a..71c72fb8da 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -9,12 +9,12 @@ const ThreadPool = @This(); lock: std.Mutex = .{}, is_running: bool = true, allocator: *std.mem.Allocator, -running: usize = 0, +spawned: usize = 0, threads: []*std.Thread, run_queue: RunQueue = .{}, idle_queue: IdleQueue = .{}, -const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent); +const IdleQueue = std.SinglyLinkedList(std.ResetEvent); const RunQueue = std.SinglyLinkedList(Runnable); const Runnable = struct { runFn: fn (*Runnable) void, @@ -30,49 +30,37 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { errdefer self.deinit(); - var num_threads = std.Thread.cpuCount() catch 1; - if (num_threads > 0) - self.threads = try allocator.alloc(*std.Thread, num_threads); + var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); + self.threads = try allocator.alloc(*std.Thread, num_threads); while (num_threads > 0) : (num_threads -= 1) { const thread = try std.Thread.spawn(self, runWorker); - self.threads[self.running] = thread; - self.running += 1; + self.threads[self.spawned] = thread; + self.spawned += 1; } } pub fn deinit(self: *ThreadPool) void { - self.shutdown(); - - std.debug.assert(!self.is_running); - for (self.threads[0..self.running]) |thread| - thread.wait(); - - defer self.threads = &[_]*std.Thread{}; - if (self.running > 0) - self.allocator.free(self.threads); -} - -pub fn shutdown(self: *ThreadPool) void { - const held = self.lock.acquire(); - - if (!self.is_running) - return held.release(); + { + const held = self.lock.acquire(); + defer held.release(); - var idle_queue = self.idle_queue; - self.idle_queue = .{}; - self.is_running = false; - held.release(); + self.is_running = false; + while (self.idle_queue.popFirst()) |idle_node| + idle_node.data.set(); + } - while (idle_queue.popFirst()) |idle_node| - idle_node.data.set(); + defer self.allocator.free(self.threads); + for (self.threads[0..self.spawned]) |thread| + thread.wait(); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { if (std.builtin.single_threaded) { - @call(.{}, func, args); + const result = @call(.{}, func, args); return; } + const Args = @TypeOf(args); const Closure = struct { arguments: Args, @@ -83,24 +71,26 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); const closure = @fieldParentPtr(@This(), "run_node", run_node); const result = @call(.{}, func, closure.arguments); + + const held = closure.pool.lock.acquire(); + defer held.release(); closure.pool.allocator.destroy(closure); } }; + const held = self.lock.acquire(); + defer held.release(); + const closure = try self.allocator.create(Closure); closure.* = .{ .arguments = args, .pool = self, }; - const held = self.lock.acquire(); self.run_queue.prepend(&closure.run_node); - const idle_node = self.idle_queue.popFirst(); - held.release(); - - if (idle_node) |node| - node.data.set(); + if (self.idle_queue.popFirst()) |idle_node| + idle_node.data.set(); } fn runWorker(self: *ThreadPool) void { @@ -113,14 +103,18 @@ fn runWorker(self: *ThreadPool) void { continue; } - if (!self.is_running) { + if (self.is_running) { + var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; + defer idle_node.data.deinit(); + + self.idle_queue.prepend(&idle_node); held.release(); - return; + + idle_node.data.wait(); + continue; } - var idle_node = IdleQueue.Node{ .data = .{} }; - self.idle_queue.prepend(&idle_node); held.release(); - idle_node.data.wait(); + return; } } |
