diff options
| author | Jacob Young <jacobly0@users.noreply.github.com> | 2025-04-01 03:45:31 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:48 -0700 |
| commit | 3eb7be5cf6cb7050991f84c11f786c61bc5599b4 (patch) | |
| tree | 96910072c22f8620b614d4777f4e85a455222005 /lib/std/Io/EventLoop.zig | |
| parent | 0f105a8a10eabdf55683c9629996fe033baf3fcf (diff) | |
| download | zig-3eb7be5cf6cb7050991f84c11f786c61bc5599b4.tar.gz zig-3eb7be5cf6cb7050991f84c11f786c61bc5599b4.zip | |
EventLoop: implement detached fibers
Diffstat (limited to 'lib/std/Io/EventLoop.zig')
| -rw-r--r-- | lib/std/Io/EventLoop.zig | 124 |
1 files changed, 78 insertions, 46 deletions
diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 067ed04c3d..83747bd008 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -9,9 +9,12 @@ const IoUring = std.os.linux.IoUring; /// Must be a thread-safe allocator. gpa: Allocator, -mutex: std.Thread.Mutex, main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)), threads: Thread.List, +detached: struct { + mutex: std.Io.Mutex, + list: std.DoublyLinkedList(void), +}, /// Empirically saw >128KB being used by the self-hosted backend to panic. const idle_stack_size = 256 * 1024; @@ -167,13 +170,16 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { errdefer gpa.free(allocated_slice); el.* = .{ .gpa = gpa, - .mutex = .{}, .main_fiber_buffer = undefined, .threads = .{ .allocated = @ptrCast(allocated_slice[0..threads_size]), .reserved = 1, .active = 1, }, + .detached = .{ + .mutex = .init, + .list = .{}, + }, }; const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer); main_fiber.* = .{ @@ -207,6 +213,23 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { } pub fn deinit(el: *EventLoop) void { + while (true) cancel(el, detached_future: { + el.detached.mutex.lock(el.io()) catch |err| switch (err) { + error.Canceled => unreachable, // main fiber cannot be canceled + }; + defer el.detached.mutex.unlock(el.io()); + const detached: *DetachedClosure = @fieldParentPtr( + "detached_queue_node", + el.detached.list.pop() orelse break, + ); + // notify the detached fiber that it is no longer allowed to recycle itself + detached.detached_queue_node = .{ + .prev = &detached.detached_queue_node, + .next = &detached.detached_queue_node, + .data = {}, + }; + break :detached_future @ptrCast(detached.fiber); + }, &.{}, .@"1"); const active_threads = @atomicLoad(u32, &el.threads.active, .acquire); for (el.threads.allocated[0..active_threads]) |*thread| { const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic); @@ -460,7 +483,7 @@ const SwitchMessage = struct { const PendingTask = union(enum) { nothing, reschedule, - recycle: *Fiber, + recycle, register_awaiter: *?*Fiber, mutex_lock: struct { prev_state: Io.Mutex.State, @@ -483,8 +506,10 @@ const SwitchMessage = struct { assert(prev_fiber.queue_next == null); el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); }, - .recycle => |fiber| { - el.recycle(fiber); + .recycle => { + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); + assert(prev_fiber.queue_next == null); + el.recycle(prev_fiber); }, .register_awaiter => |awaiter| { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); @@ -609,21 +634,7 @@ fn fiberEntry() callconv(.naked) void { switch (builtin.cpu.arch) { .x86_64 => asm volatile ( \\ leaq 8(%%rsp), %%rdi - \\ jmp %[AsyncClosure_call:P] - : - : [AsyncClosure_call] "X" (&AsyncClosure.call), - ), - else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), - } -} - -fn fiberEntryDetached() callconv(.naked) void { - switch (builtin.cpu.arch) { - .x86_64 => asm volatile ( - \\ leaq 8(%%rsp), %%rdi - \\ jmp %[DetachedClosure_call:P] - : - : [DetachedClosure_call] "X" (&DetachedClosure.call), + \\ jmpq *(%%rsp) ), else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), } @@ -649,29 +660,6 @@ const AsyncClosure = struct { } }; -const DetachedClosure = struct { - event_loop: *EventLoop, - fiber: *Fiber, - start: *const fn (context: *const anyopaque) void, - - fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 { - return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure)); - } - - fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn { - message.handle(closure.event_loop); - std.log.debug("{*} performing async detached", .{closure.fiber}); - closure.start(closure.contextPointer()); - const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); - if (awaiter) |a| { - closure.event_loop.yield(a, .nothing); - } else { - closure.event_loop.yield(null, .{ .recycle = closure.fiber }); - } - unreachable; // switched to dead fiber - } -}; - fn @"async"( userdata: ?*anyopaque, result: []u8, @@ -695,11 +683,13 @@ fn @"async"( const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward( @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, ) - @sizeOf(AsyncClosure)); + const stack_end: [*]usize = @alignCast(@ptrCast(closure)); + (stack_end - 1)[0..1].* = .{@intFromPtr(&AsyncClosure.call)}; fiber.* = .{ .required_align = {}, .context = switch (builtin.cpu.arch) { .x86_64 => .{ - .rsp = @intFromPtr(closure) - @sizeOf(usize), + .rsp = @intFromPtr(stack_end - 1), .rbp = 0, .rip = @intFromPtr(&fiberEntry), }, @@ -722,6 +712,34 @@ fn @"async"( return @ptrCast(fiber); } +const DetachedClosure = struct { + event_loop: *EventLoop, + fiber: *Fiber, + start: *const fn (context: *const anyopaque) void, + detached_queue_node: std.DoublyLinkedList(void).Node, + + fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 { + return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure)); + } + + fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn { + message.handle(closure.event_loop); + std.log.debug("{*} performing async detached", .{closure.fiber}); + closure.start(closure.contextPointer()); + const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); + closure.event_loop.yield(awaiter, pending_task: { + closure.event_loop.detached.mutex.lock(closure.event_loop.io()) catch |err| switch (err) { + error.Canceled => break :pending_task .nothing, + }; + defer closure.event_loop.detached.mutex.unlock(closure.event_loop.io()); + if (closure.detached_queue_node.next == &closure.detached_queue_node) break :pending_task .nothing; + closure.event_loop.detached.list.remove(&closure.detached_queue_node); + break :pending_task .recycle; + }); + unreachable; // switched to dead fiber + } +}; + fn go( userdata: ?*anyopaque, context: []const u8, @@ -742,13 +760,15 @@ fn go( const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward( @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, ) - @sizeOf(DetachedClosure)); + const stack_end: [*]usize = @alignCast(@ptrCast(closure)); + (stack_end - 1)[0..1].* = .{@intFromPtr(&DetachedClosure.call)}; fiber.* = .{ .required_align = {}, .context = switch (builtin.cpu.arch) { .x86_64 => .{ - .rsp = @intFromPtr(closure) - @sizeOf(usize), + .rsp = @intFromPtr(stack_end - 1), .rbp = 0, - .rip = @intFromPtr(&fiberEntryDetached), + .rip = @intFromPtr(&fiberEntry), }, else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), }, @@ -761,7 +781,19 @@ fn go( .event_loop = event_loop, .fiber = fiber, .start = start, + .detached_queue_node = .{ .data = {} }, }; + { + event_loop.detached.mutex.lock(event_loop.io()) catch |err| switch (err) { + error.Canceled => { + event_loop.recycle(fiber); + start(context.ptr); + return; + }, + }; + defer event_loop.detached.mutex.unlock(event_loop.io()); + event_loop.detached.list.append(&closure.detached_queue_node); + } @memcpy(closure.contextPointer(), context); event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber }); |
