diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-10-23 03:38:34 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:51 -0700 |
| commit | 92b8378814697880ac3b5942abb47db4e5eeb958 (patch) | |
| tree | 919da25a6a27eed15187bfc0ca244289101a7daf /lib/std | |
| parent | dd945bf1f8963452f5acf448dd26c73d2d7b29f6 (diff) | |
| download | zig-92b8378814697880ac3b5942abb47db4e5eeb958.tar.gz zig-92b8378814697880ac3b5942abb47db4e5eeb958.zip | |
concurrent and await
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Io/Kqueue.zig | 81 |
1 files changed, 58 insertions, 23 deletions
diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index 45274aba93..181ecd4ebf 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -172,9 +172,6 @@ pub fn init(k: *Kqueue, gpa: Allocator) !void { .sp = @intFromPtr(idle_stack_end), .fp = 0, .pc = @intFromPtr(&mainIdleEntry), - .x18 = asm ("" - : [x18] "={x18}" (-> u64), - ), }, .x86_64 => .{ .rsp = @intFromPtr(idle_stack_end - 1), @@ -548,7 +545,6 @@ const Context = switch (builtin.cpu.arch) { sp: u64, fp: u64, pc: u64, - x18: u64, }, .x86_64 => extern struct { rsp: u64, @@ -562,12 +558,12 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage { return @fieldParentPtr("contexts", switch (builtin.cpu.arch) { .aarch64 => asm volatile ( \\ ldp x0, x2, [x1] - \\ ldp x3, x18, [x2, #16] + \\ ldr x3, [x2, #16] \\ mov x4, sp \\ stp x4, fp, [x0] \\ adr x5, 0f \\ ldp x4, fp, [x2] - \\ stp x5, x18, [x0, #16] + \\ str x5, [x0, #16] \\ mov sp, x4 \\ br x3 \\0: @@ -761,12 +757,18 @@ fn fiberEntry() callconv(.naked) void { : : [AsyncClosure_call] "X" (&AsyncClosure.call), ), + .aarch64 => asm volatile ( + \\ mov x0, sp + \\ b %[AsyncClosure_call] + : + : [AsyncClosure_call] "X" (&AsyncClosure.call), + ), else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), } } const AsyncClosure = struct { - event_loop: *Kqueue, + kqueue: *Kqueue, fiber: *Fiber, start: *const fn (context: *const anyopaque, result: *anyopaque) void, result_align: Alignment, @@ -777,7 +779,7 @@ const AsyncClosure = struct { } fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn { - message.handle(closure.event_loop); + message.handle(closure.kqueue); const fiber = closure.fiber; std.log.debug("{*} performing async", .{fiber}); closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align)); @@ -787,7 +789,7 @@ const AsyncClosure = struct { if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null; break :r a; }; - closure.event_loop.yield(ready_awaiter, .nothing); + closure.kqueue.yield(ready_awaiter, .nothing); unreachable; // switched to dead fiber } @@ -881,19 +883,52 @@ fn async( fn concurrent( userdata: ?*anyopaque, result_len: usize, - result_alignment: std.mem.Alignment, + result_alignment: Alignment, context: []const u8, - context_alignment: std.mem.Alignment, + context_alignment: Alignment, start: *const fn (context: *const anyopaque, result: *anyopaque) void, ) error{OutOfMemory}!*Io.AnyFuture { const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = result_len; - _ = result_alignment; - _ = context; - _ = context_alignment; - _ = start; - @panic("TODO"); + assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO + assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO + assert(result_len <= Fiber.max_result_size); // TODO + assert(context.len <= Fiber.max_context_size); // TODO + + const fiber = try Fiber.allocate(k); + std.log.debug("allocated {*}", .{fiber}); + + const closure: *AsyncClosure = .fromFiber(fiber); + fiber.* = .{ + .required_align = {}, + .context = switch (builtin.cpu.arch) { + .x86_64 => .{ + .rsp = @intFromPtr(closure) - @sizeOf(usize), + .rbp = 0, + .rip = @intFromPtr(&fiberEntry), + }, + .aarch64 => .{ + .sp = @intFromPtr(closure), + .fp = 0, + .pc = @intFromPtr(&fiberEntry), + }, + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), + }, + .awaiter = null, + .queue_next = null, + .cancel_thread = null, + .awaiting_completions = .initEmpty(), + }; + closure.* = .{ + .kqueue = k, + .fiber = fiber, + .start = start, + .result_align = result_alignment, + .already_awaited = false, + }; + @memcpy(closure.contextPointer(), context); + + k.schedule(.current(), .{ .head = fiber, .tail = fiber }); + return @ptrCast(fiber); } fn await( @@ -903,11 +938,11 @@ fn await( result_alignment: std.mem.Alignment, ) void { const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = any_future; - _ = result; - _ = result_alignment; - @panic("TODO"); + const future_fiber: *Fiber = @ptrCast(@alignCast(any_future)); + if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) + k.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); + @memcpy(result, future_fiber.resultBytes(result_alignment)); + k.recycle(future_fiber); } fn cancel( |
