aboutsummaryrefslogtreecommitdiff
path: root/src/ThreadPool.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/ThreadPool.zig')
-rw-r--r--src/ThreadPool.zig152
1 files changed, 0 insertions, 152 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig
deleted file mode 100644
index fde5ed27db..0000000000
--- a/src/ThreadPool.zig
+++ /dev/null
@@ -1,152 +0,0 @@
-const std = @import("std");
-const builtin = @import("builtin");
-const ThreadPool = @This();
-const WaitGroup = @import("WaitGroup.zig");
-
-mutex: std.Thread.Mutex = .{},
-cond: std.Thread.Condition = .{},
-run_queue: RunQueue = .{},
-is_running: bool = true,
-allocator: std.mem.Allocator,
-threads: []std.Thread,
-
-const RunQueue = std.SinglyLinkedList(Runnable);
-const Runnable = struct {
- runFn: RunProto,
-};
-
-const RunProto = *const fn (*Runnable) void;
-
-pub fn init(pool: *ThreadPool, allocator: std.mem.Allocator) !void {
- pool.* = .{
- .allocator = allocator,
- .threads = &[_]std.Thread{},
- };
-
- if (builtin.single_threaded) {
- return;
- }
-
- const thread_count = std.math.max(1, std.Thread.getCpuCount() catch 1);
- pool.threads = try allocator.alloc(std.Thread, thread_count);
- errdefer allocator.free(pool.threads);
-
- // kill and join any threads we spawned previously on error.
- var spawned: usize = 0;
- errdefer pool.join(spawned);
-
- for (pool.threads) |*thread| {
- thread.* = try std.Thread.spawn(.{}, worker, .{pool});
- spawned += 1;
- }
-}
-
-pub fn deinit(pool: *ThreadPool) void {
- pool.join(pool.threads.len); // kill and join all threads.
- pool.* = undefined;
-}
-
-fn join(pool: *ThreadPool, spawned: usize) void {
- if (builtin.single_threaded) {
- return;
- }
-
- {
- pool.mutex.lock();
- defer pool.mutex.unlock();
-
- // ensure future worker threads exit the dequeue loop
- pool.is_running = false;
- }
-
- // wake up any sleeping threads (this can be done outside the mutex)
- // then wait for all the threads we know are spawned to complete.
- pool.cond.broadcast();
- for (pool.threads[0..spawned]) |thread| {
- thread.join();
- }
-
- pool.allocator.free(pool.threads);
-}
-
-pub fn spawn(pool: *ThreadPool, comptime func: anytype, args: anytype) !void {
- if (builtin.single_threaded) {
- @call(.auto, func, args);
- return;
- }
-
- const Args = @TypeOf(args);
- const Closure = struct {
- arguments: Args,
- pool: *ThreadPool,
- run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
-
- fn runFn(runnable: *Runnable) void {
- const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable);
- const closure = @fieldParentPtr(@This(), "run_node", run_node);
- @call(.auto, func, closure.arguments);
-
- // The thread pool's allocator is protected by the mutex.
- const mutex = &closure.pool.mutex;
- mutex.lock();
- defer mutex.unlock();
-
- closure.pool.allocator.destroy(closure);
- }
- };
-
- {
- pool.mutex.lock();
- defer pool.mutex.unlock();
-
- const closure = try pool.allocator.create(Closure);
- closure.* = .{
- .arguments = args,
- .pool = pool,
- };
-
- pool.run_queue.prepend(&closure.run_node);
- }
-
- // Notify waiting threads outside the lock to try and keep the critical section small.
- pool.cond.signal();
-}
-
-fn worker(pool: *ThreadPool) void {
- pool.mutex.lock();
- defer pool.mutex.unlock();
-
- while (true) {
- while (pool.run_queue.popFirst()) |run_node| {
- // Temporarily unlock the mutex in order to execute the run_node
- pool.mutex.unlock();
- defer pool.mutex.lock();
-
- const runFn = run_node.data.runFn;
- runFn(&run_node.data);
- }
-
- // Stop executing instead of waiting if the thread pool is no longer running.
- if (pool.is_running) {
- pool.cond.wait(&pool.mutex);
- } else {
- break;
- }
- }
-}
-
-pub fn waitAndWork(pool: *ThreadPool, wait_group: *WaitGroup) void {
- while (!wait_group.isDone()) {
- if (blk: {
- pool.mutex.lock();
- defer pool.mutex.unlock();
- break :blk pool.run_queue.popFirst();
- }) |run_node| {
- run_node.data.runFn(&run_node.data);
- continue;
- }
-
- wait_group.wait();
- return;
- }
-}