aboutsummaryrefslogtreecommitdiff
path: root/src/ThreadPool.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/ThreadPool.zig')
-rw-r--r--src/ThreadPool.zig76
1 files changed, 35 insertions, 41 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig
index 00cb26772a..71c72fb8da 100644
--- a/src/ThreadPool.zig
+++ b/src/ThreadPool.zig
@@ -9,12 +9,12 @@ const ThreadPool = @This();
lock: std.Mutex = .{},
is_running: bool = true,
allocator: *std.mem.Allocator,
-running: usize = 0,
+spawned: usize = 0,
threads: []*std.Thread,
run_queue: RunQueue = .{},
idle_queue: IdleQueue = .{},
-const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent);
+const IdleQueue = std.SinglyLinkedList(std.ResetEvent);
const RunQueue = std.SinglyLinkedList(Runnable);
const Runnable = struct {
runFn: fn (*Runnable) void,
@@ -30,49 +30,37 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
errdefer self.deinit();
- var num_threads = std.Thread.cpuCount() catch 1;
- if (num_threads > 0)
- self.threads = try allocator.alloc(*std.Thread, num_threads);
+ var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1);
+ self.threads = try allocator.alloc(*std.Thread, num_threads);
while (num_threads > 0) : (num_threads -= 1) {
const thread = try std.Thread.spawn(self, runWorker);
- self.threads[self.running] = thread;
- self.running += 1;
+ self.threads[self.spawned] = thread;
+ self.spawned += 1;
}
}
pub fn deinit(self: *ThreadPool) void {
- self.shutdown();
-
- std.debug.assert(!self.is_running);
- for (self.threads[0..self.running]) |thread|
- thread.wait();
-
- defer self.threads = &[_]*std.Thread{};
- if (self.running > 0)
- self.allocator.free(self.threads);
-}
-
-pub fn shutdown(self: *ThreadPool) void {
- const held = self.lock.acquire();
-
- if (!self.is_running)
- return held.release();
+ {
+ const held = self.lock.acquire();
+ defer held.release();
- var idle_queue = self.idle_queue;
- self.idle_queue = .{};
- self.is_running = false;
- held.release();
+ self.is_running = false;
+ while (self.idle_queue.popFirst()) |idle_node|
+ idle_node.data.set();
+ }
- while (idle_queue.popFirst()) |idle_node|
- idle_node.data.set();
+ defer self.allocator.free(self.threads);
+ for (self.threads[0..self.spawned]) |thread|
+ thread.wait();
}
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
if (std.builtin.single_threaded) {
- @call(.{}, func, args);
+ const result = @call(.{}, func, args);
return;
}
+
const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
@@ -83,24 +71,26 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable);
const closure = @fieldParentPtr(@This(), "run_node", run_node);
const result = @call(.{}, func, closure.arguments);
+
+ const held = closure.pool.lock.acquire();
+ defer held.release();
closure.pool.allocator.destroy(closure);
}
};
+ const held = self.lock.acquire();
+ defer held.release();
+
const closure = try self.allocator.create(Closure);
closure.* = .{
.arguments = args,
.pool = self,
};
- const held = self.lock.acquire();
self.run_queue.prepend(&closure.run_node);
- const idle_node = self.idle_queue.popFirst();
- held.release();
-
- if (idle_node) |node|
- node.data.set();
+ if (self.idle_queue.popFirst()) |idle_node|
+ idle_node.data.set();
}
fn runWorker(self: *ThreadPool) void {
@@ -113,14 +103,18 @@ fn runWorker(self: *ThreadPool) void {
continue;
}
- if (!self.is_running) {
+ if (self.is_running) {
+ var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() };
+ defer idle_node.data.deinit();
+
+ self.idle_queue.prepend(&idle_node);
held.release();
- return;
+
+ idle_node.data.wait();
+ continue;
}
- var idle_node = IdleQueue.Node{ .data = .{} };
- self.idle_queue.prepend(&idle_node);
held.release();
- idle_node.data.wait();
+ return;
}
}