aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread
diff options
context:
space:
mode:
Diffstat (limited to 'lib/std/Thread')
-rw-r--r--lib/std/Thread/Pool.zig70
1 files changed, 70 insertions, 0 deletions
diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig
index 37018f2ab7..f46c6f6802 100644
--- a/lib/std/Thread/Pool.zig
+++ b/lib/std/Thread/Pool.zig
@@ -332,6 +332,7 @@ pub fn io(pool: *Pool) Io {
.vtable = &.{
.@"async" = @"async",
.@"await" = @"await",
+ .go = go,
.cancel = cancel,
.cancelRequested = cancelRequested,
.mutexLock = mutexLock,
@@ -472,6 +473,75 @@ fn @"async"(
return @ptrCast(closure);
}
+const DetachedClosure = struct {
+ pool: *Pool,
+ func: *const fn (context: *anyopaque) void,
+ run_node: std.Thread.Pool.RunQueue.Node = .{ .data = .{ .runFn = runFn } },
+ context_alignment: std.mem.Alignment,
+ context_len: usize,
+
+ fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void {
+ const run_node: *std.Thread.Pool.RunQueue.Node = @fieldParentPtr("data", runnable);
+ const closure: *DetachedClosure = @alignCast(@fieldParentPtr("run_node", run_node));
+ closure.func(closure.contextPointer());
+ const gpa = closure.pool.allocator;
+ const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure);
+ gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]);
+ }
+
+ fn contextOffset(context_alignment: std.mem.Alignment) usize {
+ return context_alignment.forward(@sizeOf(DetachedClosure));
+ }
+
+ fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize {
+ return contextOffset(context_alignment) + context_len;
+ }
+
+ fn contextPointer(closure: *DetachedClosure) [*]u8 {
+ const base: [*]u8 = @ptrCast(closure);
+ return base + contextOffset(closure.context_alignment);
+ }
+};
+
+fn go(
+ userdata: ?*anyopaque,
+ context: []const u8,
+ context_alignment: std.mem.Alignment,
+ start: *const fn (context: *const anyopaque) void,
+) void {
+ const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+ pool.mutex.lock();
+
+ const gpa = pool.allocator;
+ const n = DetachedClosure.contextEnd(context_alignment, context.len);
+ const closure: *DetachedClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, @alignOf(DetachedClosure), n) catch {
+ pool.mutex.unlock();
+ start(context.ptr);
+ return;
+ }));
+ closure.* = .{
+ .pool = pool,
+ .func = start,
+ .context_alignment = context_alignment,
+ .context_len = context.len,
+ };
+ @memcpy(closure.contextPointer()[0..context.len], context);
+ pool.run_queue.prepend(&closure.run_node);
+
+ if (pool.threads.items.len < pool.threads.capacity) {
+ pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
+ .stack_size = pool.stack_size,
+ .allocator = gpa,
+ }, worker, .{pool}) catch t: {
+ pool.threads.items.len -= 1;
+ break :t undefined;
+ };
+ }
+
+ pool.mutex.unlock();
+ pool.cond.signal();
+}
+
fn @"await"(
userdata: ?*anyopaque,
any_future: *std.Io.AnyFuture,