From 0d1cd0d4822628c104890af4c31cdf38c6f96d35 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 15 Dec 2020 18:30:29 -0700 Subject: use kprotty's ThreadPool implementation (v5) --- src/ThreadPool.zig | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 src/ThreadPool.zig (limited to 'src/ThreadPool.zig') diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig new file mode 100644 index 0000000000..a5f59a30e7 --- /dev/null +++ b/src/ThreadPool.zig @@ -0,0 +1,116 @@ +const std = @import("std"); +const ThreadPool = @This(); + +lock: std.Mutex = .{}, +is_running: bool = true, +allocator: *std.mem.Allocator, +running: usize = 0, +threads: []*std.Thread, +run_queue: RunQueue = .{}, +idle_queue: IdleQueue = .{}, + +const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent); +const RunQueue = std.SinglyLinkedList(Runnable); +const Runnable = struct { + runFn: fn (*Runnable) void, +}; + +pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { + self.* = .{ + .allocator = allocator, + .threads = &[_]*std.Thread{}, + }; + + errdefer self.deinit(); + + var num_threads = std.Thread.cpuCount() catch 1; + if (num_threads > 0) + self.threads = try allocator.alloc(*std.Thread, num_threads); + + while (num_threads > 0) : (num_threads -= 1) { + const thread = try std.Thread.spawn(self, runWorker); + self.threads[self.running] = thread; + self.running += 1; + } +} + +pub fn deinit(self: *ThreadPool) void { + self.shutdown(); + + std.debug.assert(!self.is_running); + for (self.threads[0..self.running]) |thread| + thread.wait(); + + defer self.threads = &[_]*std.Thread{}; + if (self.running > 0) + self.allocator.free(self.threads); +} + +pub fn shutdown(self: *ThreadPool) void { + const held = self.lock.acquire(); + + if (!self.is_running) + return held.release(); + + var idle_queue = self.idle_queue; + self.idle_queue = .{}; + self.is_running = false; + held.release(); + + while (idle_queue.popFirst()) |idle_node| + idle_node.data.set(); +} + +pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { + const Args = @TypeOf(args); + const Closure = struct { + arguments: Args, + pool: *ThreadPool, + run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + + fn runFn(runnable: *Runnable) void { + const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable); + const closure = @fieldParentPtr(@This(), "run_node", run_node); + const result = @call(.{}, func, closure.arguments); + closure.pool.allocator.destroy(closure); + } + }; + + const closure = try self.allocator.create(Closure); + errdefer self.allocator.destroy(closure); + closure.* = .{ + .arguments = args, + .pool = self, + }; + + const held = self.lock.acquire(); + self.run_queue.prepend(&closure.run_node); + + const idle_node = self.idle_queue.popFirst(); + held.release(); + + if (idle_node) |node| + node.data.set(); +} + +fn runWorker(self: *ThreadPool) void { + while (true) { + const held = self.lock.acquire(); + + if (self.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (!self.is_running) { + held.release(); + return; + } + + var idle_node = IdleQueue.Node{ .data = .{} }; + self.idle_queue.prepend(&idle_node); + held.release(); + idle_node.data.wait(); + } +} -- cgit v1.2.3