aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread/Pool.zig
diff options
context:
space:
mode:
Diffstat (limited to 'lib/std/Thread/Pool.zig')
-rw-r--r--lib/std/Thread/Pool.zig58
1 files changed, 46 insertions, 12 deletions
diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig
index f1d2a7338f..e19166993a 100644
--- a/lib/std/Thread/Pool.zig
+++ b/lib/std/Thread/Pool.zig
@@ -309,46 +309,80 @@ pub fn getIdCount(pool: *Pool) usize {
return @intCast(1 + pool.threads.len);
}
+pub fn io(pool: *Pool) std.Io {
+ return .{
+ .userdata = pool,
+ .vtable = &.{
+ .@"async" = @"async",
+ .@"await" = @"await",
+ },
+ };
+}
+
const AsyncClosure = struct {
- func: *const fn (context: ?*anyopaque, result: *anyopaque) void,
- context: ?*anyopaque,
+ func: *const fn (context: *anyopaque, result: *anyopaque) void,
run_node: std.Thread.Pool.RunQueue.Node = .{ .data = .{ .runFn = runFn } },
reset_event: std.Thread.ResetEvent,
+ context_offset: usize,
+ result_offset: usize,
fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void {
const run_node: *std.Thread.Pool.RunQueue.Node = @fieldParentPtr("data", runnable);
- const closure: *@This() = @alignCast(@fieldParentPtr("run_node", run_node));
- closure.func(closure.context, closure.resultPointer());
+ const closure: *AsyncClosure = @alignCast(@fieldParentPtr("run_node", run_node));
+ closure.func(closure.contextPointer(), closure.resultPointer());
closure.reset_event.set();
}
- fn resultPointer(closure: *@This()) [*]u8 {
+ fn contextOffset(context_alignment: std.mem.Alignment) usize {
+ return context_alignment.forward(@sizeOf(AsyncClosure));
+ }
+
+ fn resultOffset(
+ context_alignment: std.mem.Alignment,
+ context_len: usize,
+ result_alignment: std.mem.Alignment,
+ ) usize {
+ return result_alignment.forward(contextOffset(context_alignment) + context_len);
+ }
+
+ fn resultPointer(closure: *AsyncClosure) [*]u8 {
+ const base: [*]u8 = @ptrCast(closure);
+ return base + closure.result_offset;
+ }
+
+ fn contextPointer(closure: *AsyncClosure) [*]u8 {
const base: [*]u8 = @ptrCast(closure);
- return base + @sizeOf(@This());
+ return base + closure.context_offset;
}
};
pub fn @"async"(
userdata: ?*anyopaque,
- eager_result: []u8,
- context: ?*anyopaque,
- start: *const fn (context: ?*anyopaque, result: *anyopaque) void,
+ result: []u8,
+ result_alignment: std.mem.Alignment,
+ context: []const u8,
+ context_alignment: std.mem.Alignment,
+ start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture {
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
pool.mutex.lock();
const gpa = pool.allocator;
- const n = @sizeOf(AsyncClosure) + eager_result.len;
+ const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
+ const result_offset = result_alignment.forward(context_offset + context.len);
+ const n = result_offset + result.len;
const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, @alignOf(AsyncClosure), n) catch {
pool.mutex.unlock();
- start(context, eager_result.ptr);
+ start(context.ptr, result.ptr);
return null;
}));
closure.* = .{
.func = start,
- .context = context,
+ .context_offset = context_offset,
+ .result_offset = result_offset,
.reset_event = .{},
};
+ @memcpy(closure.contextPointer()[0..context.len], context);
pool.run_queue.prepend(&closure.run_node);
pool.mutex.unlock();