diff options
Diffstat (limited to 'lib/std/Thread/Pool.zig')
| -rw-r--r-- | lib/std/Thread/Pool.zig | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index 37018f2ab7..f46c6f6802 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -332,6 +332,7 @@ pub fn io(pool: *Pool) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", + .go = go, .cancel = cancel, .cancelRequested = cancelRequested, .mutexLock = mutexLock, @@ -472,6 +473,75 @@ fn @"async"( return @ptrCast(closure); } +const DetachedClosure = struct { + pool: *Pool, + func: *const fn (context: *anyopaque) void, + run_node: std.Thread.Pool.RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + context_alignment: std.mem.Alignment, + context_len: usize, + + fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void { + const run_node: *std.Thread.Pool.RunQueue.Node = @fieldParentPtr("data", runnable); + const closure: *DetachedClosure = @alignCast(@fieldParentPtr("run_node", run_node)); + closure.func(closure.contextPointer()); + const gpa = closure.pool.allocator; + const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure); + gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]); + } + + fn contextOffset(context_alignment: std.mem.Alignment) usize { + return context_alignment.forward(@sizeOf(DetachedClosure)); + } + + fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize { + return contextOffset(context_alignment) + context_len; + } + + fn contextPointer(closure: *DetachedClosure) [*]u8 { + const base: [*]u8 = @ptrCast(closure); + return base + contextOffset(closure.context_alignment); + } +}; + +fn go( + userdata: ?*anyopaque, + context: []const u8, + context_alignment: std.mem.Alignment, + start: *const fn (context: *const anyopaque) void, +) void { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + pool.mutex.lock(); + + const gpa = pool.allocator; + const n = DetachedClosure.contextEnd(context_alignment, context.len); + const closure: *DetachedClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, @alignOf(DetachedClosure), n) catch { + pool.mutex.unlock(); + start(context.ptr); + return; + })); + closure.* = .{ + .pool = pool, + .func = start, + .context_alignment = context_alignment, + .context_len = context.len, + }; + @memcpy(closure.contextPointer()[0..context.len], context); + pool.run_queue.prepend(&closure.run_node); + + if (pool.threads.items.len < pool.threads.capacity) { + pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{ + .stack_size = pool.stack_size, + .allocator = gpa, + }, worker, .{pool}) catch t: { + pool.threads.items.len -= 1; + break :t undefined; + }; + } + + pool.mutex.unlock(); + pool.cond.signal(); +} + fn @"await"( userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, |
