diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-10-22 13:21:59 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:51 -0700 |
| commit | 41070932f8a3a1dead7fb424f427b04824c19c72 (patch) | |
| tree | 3005b6e1f203c0ccdc39a7c8ef3e997c33bfe8dd /lib/std | |
| parent | df84dc18bc2fa18770591be4d1e0cabc8d4b28c2 (diff) | |
| download | zig-41070932f8a3a1dead7fb424f427b04824c19c72.tar.gz zig-41070932f8a3a1dead7fb424f427b04824c19c72.zip | |
revert adding asyncDetached
instead we will have Io.Group
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Io/IoUring.zig | 114 |
1 files changed, 10 insertions, 104 deletions
diff --git a/lib/std/Io/IoUring.zig b/lib/std/Io/IoUring.zig index 43ce8e1c0d..6d31f337c5 100644 --- a/lib/std/Io/IoUring.zig +++ b/lib/std/Io/IoUring.zig @@ -10,12 +10,9 @@ const IoUring = std.os.linux.IoUring; /// Must be a thread-safe allocator. gpa: Allocator, +mutex: std.Thread.Mutex, main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)), threads: Thread.List, -detached: struct { - mutex: std.Io.Mutex, - list: std.DoublyLinkedList, -}, /// Empirically saw >128KB being used by the self-hosted backend to panic. const idle_stack_size = 256 * 1024; @@ -142,7 +139,6 @@ pub fn io(el: *EventLoop) Io { .async = async, .concurrent = concurrent, .await = await, - .asyncDetached = asyncDetached, .select = select, .cancel = cancel, .cancelRequested = cancelRequested, @@ -172,16 +168,13 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { errdefer gpa.free(allocated_slice); el.* = .{ .gpa = gpa, + .mutex = .{}, .main_fiber_buffer = undefined, .threads = .{ .allocated = @ptrCast(allocated_slice[0..threads_size]), .reserved = 1, .active = 1, }, - .detached = .{ - .mutex = .init, - .list = .{}, - }, }; const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer); main_fiber.* = .{ @@ -223,22 +216,6 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { } pub fn deinit(el: *EventLoop) void { - while (true) cancel(el, detached_future: { - el.detached.mutex.lock(el.io()) catch |err| switch (err) { - error.Canceled => unreachable, // main fiber cannot be canceled - }; - defer el.detached.mutex.unlock(el.io()); - const detached: *DetachedClosure = @fieldParentPtr( - "detached_queue_node", - el.detached.list.pop() orelse break, - ); - // notify the detached fiber that it is no longer allowed to recycle itself - detached.detached_queue_node = .{ - .prev = &detached.detached_queue_node, - .next = &detached.detached_queue_node, - }; - break :detached_future @ptrCast(detached.fiber); - }, &.{}, .@"1"); const active_threads = @atomicLoad(u32, &el.threads.active, .acquire); for (el.threads.allocated[0..active_threads]) |*thread| { const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic); @@ -492,7 +469,7 @@ const SwitchMessage = struct { const PendingTask = union(enum) { nothing, reschedule, - recycle, + recycle: *Fiber, register_awaiter: *?*Fiber, register_select: []const *Io.AnyFuture, mutex_lock: struct { @@ -516,10 +493,8 @@ const SwitchMessage = struct { assert(prev_fiber.queue_next == null); el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); }, - .recycle => { - const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); - assert(prev_fiber.queue_next == null); - el.recycle(prev_fiber); + .recycle => |fiber| { + el.recycle(fiber); }, .register_awaiter => |awaiter| { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); @@ -829,12 +804,9 @@ fn fiberEntry() callconv(.naked) void { switch (builtin.cpu.arch) { .x86_64 => asm volatile ( \\ leaq 8(%%rsp), %%rdi - \\ jmpq *(%%rsp) - ), - .aarch64 => asm volatile ( - \\ mov x0, sp - \\ ldr x2, [sp, #-8] - \\ br x2 + \\ jmp %[AsyncClosure_call:P] + : + : [AsyncClosure_call] "X" (&AsyncClosure.call), ), else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), } @@ -905,18 +877,16 @@ fn concurrent( std.log.debug("allocated {*}", .{fiber}); const closure: *AsyncClosure = .fromFiber(fiber); - const stack_end: [*]align(16) usize = @ptrCast(@alignCast(closure)); - (stack_end - 1)[0..1].* = .{@intFromPtr(&AsyncClosure.call)}; fiber.* = .{ .required_align = {}, .context = switch (builtin.cpu.arch) { .x86_64 => .{ - .rsp = @intFromPtr(stack_end - 1), + .rsp = @intFromPtr(closure) - @sizeOf(usize), .rbp = 0, .rip = @intFromPtr(&fiberEntry), }, .aarch64 => .{ - .sp = @intFromPtr(stack_end), + .sp = @intFromPtr(closure) - @sizeOf(usize) - 1, .fp = 0, .pc = @intFromPtr(&fiberEntry), }, @@ -968,70 +938,6 @@ const DetachedClosure = struct { } }; -fn asyncDetached( - userdata: ?*anyopaque, - context: []const u8, - context_alignment: std.mem.Alignment, - start: *const fn (context: *const anyopaque) void, -) void { - assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO - assert(context.len <= Fiber.max_context_size); // TODO - - const event_loop: *EventLoop = @ptrCast(@alignCast(userdata)); - const fiber = Fiber.allocate(event_loop) catch { - start(context.ptr); - return; - }; - std.log.debug("allocated {*}", .{fiber}); - - const current_thread: *Thread = .current(); - const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward( - @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, - ) - @sizeOf(DetachedClosure)); - const stack_end: [*]align(16) usize = @ptrCast(@alignCast(closure)); - (stack_end - 1)[0..1].* = .{@intFromPtr(&DetachedClosure.call)}; - fiber.* = .{ - .required_align = {}, - .context = switch (builtin.cpu.arch) { - .x86_64 => .{ - .rsp = @intFromPtr(stack_end - 1), - .rbp = 0, - .rip = @intFromPtr(&fiberEntry), - }, - .aarch64 => .{ - .sp = @intFromPtr(stack_end), - .fp = 0, - .pc = @intFromPtr(&fiberEntry), - }, - else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), - }, - .awaiter = null, - .queue_next = null, - .cancel_thread = null, - .awaiting_completions = .initEmpty(), - }; - closure.* = .{ - .event_loop = event_loop, - .fiber = fiber, - .start = start, - .detached_queue_node = .{}, - }; - { - event_loop.detached.mutex.lock(event_loop.io()) catch |err| switch (err) { - error.Canceled => { - event_loop.recycle(fiber); - start(context.ptr); - return; - }, - }; - defer event_loop.detached.mutex.unlock(event_loop.io()); - event_loop.detached.list.append(&closure.detached_queue_node); - } - @memcpy(closure.contextPointer(), context); - - event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber }); -} - fn await( userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, |
