diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2020-12-23 16:57:18 -0800 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2020-12-23 16:57:18 -0800 |
| commit | 177377b6e356b34bbed40cadca596658d158af6b (patch) | |
| tree | ed7e0a7fa146b8c15044e21f386ec8a8e2977695 /src/ThreadPool.zig | |
| parent | 5377b7fb97311448daa3c29a8c8f100656d871ba (diff) | |
| download | zig-177377b6e356b34bbed40cadca596658d158af6b.tar.gz zig-177377b6e356b34bbed40cadca596658d158af6b.zip | |
rework std.ResetEvent, improve std lib Darwin integration
* split std.ResetEvent into:
- ResetEvent - requires init() at runtime and it can fail. Also
requires deinit().
- StaticResetEvent - can be statically initialized and requires no
deinitialization. Initialization cannot fail.
* the POSIX sem_t implementation can in fact fail on initialization
because it is allowed to be implemented as a file descriptor.
* Completely define, clarify, and explain in detail the semantics of
these APIs. Remove the `isSet` function.
* `ResetEvent.timedWait` returns an enum instead of a possible error.
* `ResetEvent.init` takes a pointer to the ResetEvent instead of
returning a copy.
* On Darwin, `ResetEvent` is implemented using Grand Central Dispatch,
which is exposed by libSystem.
stage2 changes:
* ThreadPool: use a single, pre-initialized `ResetEvent` per worker.
* WaitGroup: now requires init() and deinit() and init() can fail.
- Add a `reset` function.
- Compilation initializes one for the work queue in creation and
re-uses it for every update.
- Rename `stop` to `finish`.
- Simplify the implementation based on the usage pattern.
Diffstat (limited to 'src/ThreadPool.zig')
| -rw-r--r-- | src/ThreadPool.zig | 96 |
1 files changed, 57 insertions, 39 deletions
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig index cf9c02fa59..1e91d3f731 100644 --- a/src/ThreadPool.zig +++ b/src/ThreadPool.zig @@ -9,8 +9,7 @@ const ThreadPool = @This(); lock: std.Mutex = .{}, is_running: bool = true, allocator: *std.mem.Allocator, -spawned: usize = 0, -threads: []*std.Thread, +workers: []Worker, run_queue: RunQueue = .{}, idle_queue: IdleQueue = .{}, @@ -20,23 +19,69 @@ const Runnable = struct { runFn: fn (*Runnable) void, }; +const Worker = struct { + pool: *ThreadPool, + thread: *std.Thread, + /// The node is for this worker only and must have an already initialized event + /// when the thread is spawned. + idle_node: IdleQueue.Node, + + fn run(worker: *Worker) void { + while (true) { + const held = worker.pool.lock.acquire(); + + if (worker.pool.run_queue.popFirst()) |run_node| { + held.release(); + (run_node.data.runFn)(&run_node.data); + continue; + } + + if (worker.pool.is_running) { + worker.idle_node.data.reset(); + + worker.pool.idle_queue.prepend(&worker.idle_node); + held.release(); + + worker.idle_node.data.wait(); + continue; + } + + held.release(); + return; + } + } +}; + pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { self.* = .{ .allocator = allocator, - .threads = &[_]*std.Thread{}, + .workers = &[_]Worker{}, }; if (std.builtin.single_threaded) return; - errdefer self.deinit(); + const worker_count = std.math.max(1, std.Thread.cpuCount() catch 1); + self.workers = try allocator.alloc(Worker, worker_count); + errdefer allocator.free(self.workers); + + var worker_index: usize = 0; + errdefer self.destroyWorkers(worker_index); + while (worker_index < worker_count) : (worker_index += 1) { + const worker = &self.workers[worker_index]; + worker.pool = self; - var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); - self.threads = try allocator.alloc(*std.Thread, num_threads); + // Each worker requires its ResetEvent to be pre-initialized. + try worker.idle_node.data.init(); + errdefer worker.idle_node.data.deinit(); + + worker.thread = try std.Thread.spawn(worker, Worker.run); + } +} - while (num_threads > 0) : (num_threads -= 1) { - const thread = try std.Thread.spawn(self, runWorker); - self.threads[self.spawned] = thread; - self.spawned += 1; +fn destroyWorkers(self: *ThreadPool, spawned: usize) void { + for (self.workers[0..spawned]) |*worker| { + worker.thread.wait(); + worker.idle_node.data.deinit(); } } @@ -50,9 +95,8 @@ pub fn deinit(self: *ThreadPool) void { idle_node.data.set(); } - defer self.allocator.free(self.threads); - for (self.threads[0..self.spawned]) |thread| - thread.wait(); + self.destroyWorkers(self.workers.len); + self.allocator.free(self.workers); } pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { @@ -92,29 +136,3 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { if (self.idle_queue.popFirst()) |idle_node| idle_node.data.set(); } - -fn runWorker(self: *ThreadPool) void { - while (true) { - const held = self.lock.acquire(); - - if (self.run_queue.popFirst()) |run_node| { - held.release(); - (run_node.data.runFn)(&run_node.data); - continue; - } - - if (self.is_running) { - var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() }; - - self.idle_queue.prepend(&idle_node); - held.release(); - - idle_node.data.wait(); - idle_node.data.deinit(); - continue; - } - - held.release(); - return; - } -} |
