diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-10-23 03:21:32 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:51 -0700 |
| commit | dd945bf1f8963452f5acf448dd26c73d2d7b29f6 (patch) | |
| tree | 60992ddb4b6b2d82c86c3660312f805f8f271bab /lib/std | |
| parent | 41070932f8a3a1dead7fb424f427b04824c19c72 (diff) | |
| download | zig-dd945bf1f8963452f5acf448dd26c73d2d7b29f6.tar.gz zig-dd945bf1f8963452f5acf448dd26c73d2d7b29f6.zip | |
one kqueue per thread
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Io/IoUring.zig | 34 | ||||
| -rw-r--r-- | lib/std/Io/Kqueue.zig | 825 |
2 files changed, 816 insertions, 43 deletions
diff --git a/lib/std/Io/IoUring.zig b/lib/std/Io/IoUring.zig index 6d31f337c5..9ec1dafb31 100644 --- a/lib/std/Io/IoUring.zig +++ b/lib/std/Io/IoUring.zig @@ -67,8 +67,8 @@ const Fiber = struct { const min_stack_size = 4 * 1024 * 1024; const max_context_align: Alignment = .@"16"; const max_context_size = max_context_align.forward(1024); - const max_closure_size: usize = @max(@sizeOf(AsyncClosure), @sizeOf(DetachedClosure)); - const max_closure_align: Alignment = .max(.of(AsyncClosure), .of(DetachedClosure)); + const max_closure_size: usize = @sizeOf(AsyncClosure); + const max_closure_align: Alignment = .of(AsyncClosure); const allocation_size = std.mem.alignForward( usize, max_closure_align.max(max_context_align).forward( @@ -886,7 +886,7 @@ fn concurrent( .rip = @intFromPtr(&fiberEntry), }, .aarch64 => .{ - .sp = @intFromPtr(closure) - @sizeOf(usize) - 1, + .sp = @intFromPtr(closure), .fp = 0, .pc = @intFromPtr(&fiberEntry), }, @@ -910,34 +910,6 @@ fn concurrent( return @ptrCast(fiber); } -const DetachedClosure = struct { - event_loop: *EventLoop, - fiber: *Fiber, - start: *const fn (context: *const anyopaque) void, - detached_queue_node: std.DoublyLinkedList.Node, - - fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 { - return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure)); - } - - fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn { - message.handle(closure.event_loop); - std.log.debug("{*} performing async detached", .{closure.fiber}); - closure.start(closure.contextPointer()); - const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); - closure.event_loop.yield(awaiter, pending_task: { - closure.event_loop.detached.mutex.lock(closure.event_loop.io()) catch |err| switch (err) { - error.Canceled => break :pending_task .nothing, - }; - defer closure.event_loop.detached.mutex.unlock(closure.event_loop.io()); - if (closure.detached_queue_node.next == &closure.detached_queue_node) break :pending_task .nothing; - closure.event_loop.detached.list.remove(&closure.detached_queue_node); - break :pending_task .recycle; - }); - unreachable; // switched to dead fiber - } -}; - fn await( userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index eef959155a..45274aba93 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -9,23 +9,795 @@ const net = std.Io.net; const assert = std.debug.assert; const Allocator = std.mem.Allocator; const Alignment = std.mem.Alignment; -const posix = std.posix; const IpAddress = std.Io.net.IpAddress; const errnoBug = std.Io.Threaded.errnoBug; +const posix = std.posix; /// Must be a thread-safe allocator. gpa: Allocator, +mutex: std.Thread.Mutex, +main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)), +threads: Thread.List, -pub fn init(gpa: Allocator) Kqueue { - return .{ +/// Empirically saw >128KB being used by the self-hosted backend to panic. +const idle_stack_size = 256 * 1024; + +const max_idle_search = 4; +const max_steal_ready_search = 4; + +const changes_buffer_len = 64; + +const Thread = struct { + thread: std.Thread, + idle_context: Context, + current_context: *Context, + ready_queue: ?*Fiber, + kq_fd: posix.fd_t, + idle_search_index: u32, + steal_ready_search_index: u32, + + const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread)); + + threadlocal var self: *Thread = undefined; + + fn current() *Thread { + return self; + } + + fn currentFiber(thread: *Thread) *Fiber { + return @fieldParentPtr("context", thread.current_context); + } + + const List = struct { + allocated: []Thread, + reserved: u32, + active: u32, + }; +}; + +const Fiber = struct { + required_align: void align(4), + context: Context, + awaiter: ?*Fiber, + queue_next: ?*Fiber, + cancel_thread: ?*Thread, + awaiting_completions: std.StaticBitSet(3), + + const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread)); + + const max_result_align: Alignment = .@"16"; + const max_result_size = max_result_align.forward(64); + /// This includes any stack realignments that need to happen, and also the + /// initial frame return address slot and argument frame, depending on target. + const min_stack_size = 4 * 1024 * 1024; + const max_context_align: Alignment = .@"16"; + const max_context_size = max_context_align.forward(1024); + const max_closure_size: usize = @sizeOf(AsyncClosure); + const max_closure_align: Alignment = .of(AsyncClosure); + const allocation_size = std.mem.alignForward( + usize, + max_closure_align.max(max_context_align).forward( + max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size, + ) + max_closure_size + max_context_size, + std.heap.page_size_max, + ); + + fn allocate(k: *Kqueue) error{OutOfMemory}!*Fiber { + return @ptrCast(try k.gpa.alignedAlloc(u8, .of(Fiber), allocation_size)); + } + + fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 { + return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size]; + } + + fn allocatedEnd(f: *Fiber) [*]u8 { + const allocated_slice = f.allocatedSlice(); + return allocated_slice[allocated_slice.len..].ptr; + } + + fn resultPointer(f: *Fiber, comptime Result: type) *Result { + return @ptrCast(@alignCast(f.resultBytes(.of(Result)))); + } + + fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 { + return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber))); + } + + fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void { + if (@cmpxchgStrong( + ?*Thread, + &fiber.cancel_thread, + null, + thread, + .acq_rel, + .acquire, + )) |cancel_thread| { + assert(cancel_thread == Thread.canceling); + return error.Canceled; + } + } + + fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void { + if (@cmpxchgStrong( + ?*Thread, + &fiber.cancel_thread, + thread, + null, + .acq_rel, + .acquire, + )) |cancel_thread| assert(cancel_thread == Thread.canceling); + } + + const Queue = struct { head: *Fiber, tail: *Fiber }; +}; + +fn recycle(k: *Kqueue, fiber: *Fiber) void { + std.log.debug("recyling {*}", .{fiber}); + assert(fiber.queue_next == null); + k.gpa.free(fiber.allocatedSlice()); +} + +pub fn init(k: *Kqueue, gpa: Allocator) !void { + const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread); + const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max); + const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset); + errdefer gpa.free(allocated_slice); + k.* = .{ .gpa = gpa, + .mutex = .{}, + .main_fiber_buffer = undefined, + .threads = .{ + .allocated = @ptrCast(allocated_slice[0..threads_size]), + .reserved = 1, + .active = 1, + }, }; + const main_fiber: *Fiber = @ptrCast(&k.main_fiber_buffer); + main_fiber.* = .{ + .required_align = {}, + .context = undefined, + .awaiter = null, + .queue_next = null, + .cancel_thread = null, + .awaiting_completions = .initEmpty(), + }; + const main_thread = &k.threads.allocated[0]; + Thread.self = main_thread; + const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr)); + (idle_stack_end - 1)[0..1].* = .{@intFromPtr(k)}; + main_thread.* = .{ + .thread = undefined, + .idle_context = switch (builtin.cpu.arch) { + .aarch64 => .{ + .sp = @intFromPtr(idle_stack_end), + .fp = 0, + .pc = @intFromPtr(&mainIdleEntry), + .x18 = asm ("" + : [x18] "={x18}" (-> u64), + ), + }, + .x86_64 => .{ + .rsp = @intFromPtr(idle_stack_end - 1), + .rbp = 0, + .rip = @intFromPtr(&mainIdleEntry), + }, + else => @compileError("unimplemented architecture"), + }, + .current_context = &main_fiber.context, + .ready_queue = null, + .kq_fd = try posix.kqueue(), + .idle_search_index = 1, + .steal_ready_search_index = 1, + }; + errdefer std.posix.close(main_thread.kq_fd); + std.log.debug("created main idle {*}", .{&main_thread.idle_context}); + std.log.debug("created main {*}", .{main_fiber}); } pub fn deinit(k: *Kqueue) void { + const active_threads = @atomicLoad(u32, &k.threads.active, .acquire); + for (k.threads.allocated[0..active_threads]) |*thread| { + const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic); + assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async + } + k.yield(null, .exit); + const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(k.threads.allocated.ptr)); + const idle_stack_end_offset = std.mem.alignForward(usize, k.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); + for (k.threads.allocated[1..active_threads]) |*thread| thread.thread.join(); + k.gpa.free(allocated_ptr[0..idle_stack_end_offset]); k.* = undefined; } +fn findReadyFiber(k: *Kqueue, thread: *Thread) ?*Fiber { + if (@atomicRmw(?*Fiber, &thread.ready_queue, .Xchg, Fiber.finished, .acquire)) |ready_fiber| { + @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release); + ready_fiber.queue_next = null; + return ready_fiber; + } + const active_threads = @atomicLoad(u32, &k.threads.active, .acquire); + for (0..@min(max_steal_ready_search, active_threads)) |_| { + defer thread.steal_ready_search_index += 1; + if (thread.steal_ready_search_index == active_threads) thread.steal_ready_search_index = 0; + const steal_ready_search_thread = &k.threads.allocated[0..active_threads][thread.steal_ready_search_index]; + if (steal_ready_search_thread == thread) continue; + const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue; + if (ready_fiber == Fiber.finished) continue; + if (@cmpxchgWeak( + ?*Fiber, + &steal_ready_search_thread.ready_queue, + ready_fiber, + null, + .acquire, + .monotonic, + )) |_| continue; + @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release); + ready_fiber.queue_next = null; + return ready_fiber; + } + // couldn't find anything to do, so we are now open for business + @atomicStore(?*Fiber, &thread.ready_queue, null, .monotonic); + return null; +} + +fn yield(k: *Kqueue, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void { + const thread: *Thread = .current(); + const ready_context = if (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber| + &ready_fiber.context + else + &thread.idle_context; + const message: SwitchMessage = .{ + .contexts = .{ + .prev = thread.current_context, + .ready = ready_context, + }, + .pending_task = pending_task, + }; + std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready }); + contextSwitch(&message).handle(k); +} + +fn schedule(k: *Kqueue, thread: *Thread, ready_queue: Fiber.Queue) void { + { + var fiber = ready_queue.head; + while (true) { + std.log.debug("scheduling {*}", .{fiber}); + fiber = fiber.queue_next orelse break; + } + assert(fiber == ready_queue.tail); + } + // shared fields of previous `Thread` must be initialized before later ones are marked as active + const new_thread_index = @atomicLoad(u32, &k.threads.active, .acquire); + for (0..@min(max_idle_search, new_thread_index)) |_| { + defer thread.idle_search_index += 1; + if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0; + const idle_search_thread = &k.threads.allocated[0..new_thread_index][thread.idle_search_index]; + if (idle_search_thread == thread) continue; + if (@cmpxchgWeak( + ?*Fiber, + &idle_search_thread.ready_queue, + null, + ready_queue.head, + .release, + .monotonic, + )) |_| continue; + const changes = [_]posix.Kevent{ + .{ + .ident = 0, + .filter = std.c.EVFILT.USER, + .flags = std.c.EV.ADD | std.c.EV.ONESHOT, + .fflags = std.c.NOTE.TRIGGER, + .data = 0, + .udata = @intFromEnum(Completion.UserData.wakeup), + }, + }; + // If an error occurs it only pessimises scheduling. + _ = posix.kevent(idle_search_thread.kq_fd, &changes, &.{}, null) catch {}; + return; + } + spawn_thread: { + // previous failed reservations must have completed before retrying + if (new_thread_index == k.threads.allocated.len or @cmpxchgWeak( + u32, + &k.threads.reserved, + new_thread_index, + new_thread_index + 1, + .acquire, + .monotonic, + ) != null) break :spawn_thread; + const new_thread = &k.threads.allocated[new_thread_index]; + const next_thread_index = new_thread_index + 1; + new_thread.* = .{ + .thread = undefined, + .idle_context = undefined, + .current_context = &new_thread.idle_context, + .ready_queue = ready_queue.head, + .kq_fd = posix.kqueue() catch |err| { + @atomicStore(u32, &k.threads.reserved, new_thread_index, .release); + // no more access to `thread` after giving up reservation + std.log.warn("unable to create worker thread due to kqueue init failure: {t}", .{err}); + break :spawn_thread; + }, + .idle_search_index = 0, + .steal_ready_search_index = 0, + }; + new_thread.thread = std.Thread.spawn(.{ + .stack_size = idle_stack_size, + .allocator = k.gpa, + }, threadEntry, .{ k, new_thread_index }) catch |err| { + posix.close(new_thread.kq_fd); + @atomicStore(u32, &k.threads.reserved, new_thread_index, .release); + // no more access to `thread` after giving up reservation + std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)}); + break :spawn_thread; + }; + // shared fields of `Thread` must be initialized before being marked active + @atomicStore(u32, &k.threads.active, next_thread_index, .release); + return; + } + // nobody wanted it, so just queue it on ourselves + while (@cmpxchgWeak( + ?*Fiber, + &thread.ready_queue, + ready_queue.tail.queue_next, + ready_queue.head, + .acq_rel, + .acquire, + )) |old_head| ready_queue.tail.queue_next = old_head; +} + +fn mainIdle(k: *Kqueue, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { + message.handle(k); + k.idle(&k.threads.allocated[0]); + k.yield(@ptrCast(&k.main_fiber_buffer), .nothing); + unreachable; // switched to dead fiber +} + +fn threadEntry(k: *Kqueue, index: u32) void { + const thread: *Thread = &k.threads.allocated[index]; + Thread.self = thread; + std.log.debug("created thread idle {*}", .{&thread.idle_context}); + k.idle(thread); +} + +const Completion = struct { + const UserData = enum(usize) { + unused, + wakeup, + cleanup, + exit, + /// *Fiber + _, + }; + /// Corresponds to Kevent field. + flags: u16, + /// Corresponds to Kevent field. + fflags: u32, + /// Corresponds to Kevent field. + data: isize, +}; + +fn idle(k: *Kqueue, thread: *Thread) void { + var events_buffer: [changes_buffer_len]posix.Kevent = undefined; + var maybe_ready_fiber: ?*Fiber = null; + while (true) { + while (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber| { + k.yield(ready_fiber, .nothing); + maybe_ready_fiber = null; + } + const n = posix.kevent(thread.kq_fd, &.{}, &events_buffer, null) catch |err| { + // TODO handle EINTR for cancellation purposes + @panic(@errorName(err)); + }; + var maybe_ready_queue: ?Fiber.Queue = null; + for (events_buffer[0..n]) |event| switch (@as(Completion.UserData, @enumFromInt(event.udata))) { + .unused => unreachable, // bad submission queued? + .wakeup => {}, + .cleanup => @panic("failed to notify other threads that we are exiting"), + .exit => { + assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async + return; + }, + _ => { + const fiber: *Fiber = @ptrFromInt(event.udata); + assert(fiber.queue_next == null); + fiber.resultPointer(Completion).* = .{ + .flags = event.flags, + .fflags = event.fflags, + .data = event.data, + }; + if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| { + ready_queue.tail.queue_next = fiber; + ready_queue.tail = fiber; + } else maybe_ready_queue = .{ .head = fiber, .tail = fiber }; + }, + }; + if (maybe_ready_queue) |ready_queue| k.schedule(thread, ready_queue); + } +} + +const SwitchMessage = struct { + contexts: extern struct { + prev: *Context, + ready: *Context, + }, + pending_task: PendingTask, + + const PendingTask = union(enum) { + nothing, + reschedule, + recycle: *Fiber, + register_awaiter: *?*Fiber, + register_select: []const *Io.AnyFuture, + mutex_lock: struct { + prev_state: Io.Mutex.State, + mutex: *Io.Mutex, + }, + condition_wait: struct { + cond: *Io.Condition, + mutex: *Io.Mutex, + }, + exit, + }; + + fn handle(message: *const SwitchMessage, k: *Kqueue) void { + const thread: *Thread = .current(); + thread.current_context = message.contexts.ready; + switch (message.pending_task) { + .nothing => {}, + .reschedule => if (message.contexts.prev != &thread.idle_context) { + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); + assert(prev_fiber.queue_next == null); + k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); + }, + .recycle => |fiber| { + k.recycle(fiber); + }, + .register_awaiter => |awaiter| { + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); + assert(prev_fiber.queue_next == null); + if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) + k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); + }, + .register_select => |futures| { + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); + assert(prev_fiber.queue_next == null); + for (futures) |any_future| { + const future_fiber: *Fiber = @ptrCast(@alignCast(any_future)); + if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) { + const closure: *AsyncClosure = .fromFiber(future_fiber); + if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) { + k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); + } + } + } + }, + .mutex_lock => |mutex_lock| { + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); + assert(prev_fiber.queue_next == null); + var prev_state = mutex_lock.prev_state; + while (switch (prev_state) { + else => next_state: { + prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state)); + break :next_state @cmpxchgWeak( + Io.Mutex.State, + &mutex_lock.mutex.state, + prev_state, + @enumFromInt(@intFromPtr(prev_fiber)), + .release, + .acquire, + ); + }, + .unlocked => @cmpxchgWeak( + Io.Mutex.State, + &mutex_lock.mutex.state, + .unlocked, + .locked_once, + .acquire, + .acquire, + ) orelse { + prev_fiber.queue_next = null; + k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); + return; + }, + }) |next_state| prev_state = next_state; + }, + .condition_wait => |condition_wait| { + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); + assert(prev_fiber.queue_next == null); + const cond_impl = prev_fiber.resultPointer(Condition); + cond_impl.* = .{ + .tail = prev_fiber, + .event = .queued, + }; + if (@cmpxchgStrong( + ?*Fiber, + @as(*?*Fiber, @ptrCast(&condition_wait.cond.state)), + null, + prev_fiber, + .release, + .acquire, + )) |waiting_fiber| { + const waiting_cond_impl = waiting_fiber.?.resultPointer(Condition); + assert(waiting_cond_impl.tail.queue_next == null); + waiting_cond_impl.tail.queue_next = prev_fiber; + waiting_cond_impl.tail = prev_fiber; + } + condition_wait.mutex.unlock(k.io()); + }, + .exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| { + const changes = [_]posix.Kevent{ + .{ + .ident = 0, + .filter = std.c.EVFILT.USER, + .flags = std.c.EV.ADD | std.c.EV.ONESHOT, + .fflags = std.c.NOTE.TRIGGER, + .data = 0, + .udata = @intFromEnum(Completion.UserData.exit), + }, + }; + _ = posix.kevent(each_thread.kq_fd, &changes, &.{}, null) catch |err| { + @panic(@errorName(err)); + }; + }, + } + } +}; + +const Context = switch (builtin.cpu.arch) { + .aarch64 => extern struct { + sp: u64, + fp: u64, + pc: u64, + x18: u64, + }, + .x86_64 => extern struct { + rsp: u64, + rbp: u64, + rip: u64, + }, + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), +}; + +inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage { + return @fieldParentPtr("contexts", switch (builtin.cpu.arch) { + .aarch64 => asm volatile ( + \\ ldp x0, x2, [x1] + \\ ldp x3, x18, [x2, #16] + \\ mov x4, sp + \\ stp x4, fp, [x0] + \\ adr x5, 0f + \\ ldp x4, fp, [x2] + \\ stp x5, x18, [x0, #16] + \\ mov sp, x4 + \\ br x3 + \\0: + : [received_message] "={x1}" (-> *const @FieldType(SwitchMessage, "contexts")), + : [message_to_send] "{x1}" (&message.contexts), + : .{ + .x0 = true, + .x1 = true, + .x2 = true, + .x3 = true, + .x4 = true, + .x5 = true, + .x6 = true, + .x7 = true, + .x8 = true, + .x9 = true, + .x10 = true, + .x11 = true, + .x12 = true, + .x13 = true, + .x14 = true, + .x15 = true, + .x16 = true, + .x17 = true, + .x19 = true, + .x20 = true, + .x21 = true, + .x22 = true, + .x23 = true, + .x24 = true, + .x25 = true, + .x26 = true, + .x27 = true, + .x28 = true, + .x30 = true, + .z0 = true, + .z1 = true, + .z2 = true, + .z3 = true, + .z4 = true, + .z5 = true, + .z6 = true, + .z7 = true, + .z8 = true, + .z9 = true, + .z10 = true, + .z11 = true, + .z12 = true, + .z13 = true, + .z14 = true, + .z15 = true, + .z16 = true, + .z17 = true, + .z18 = true, + .z19 = true, + .z20 = true, + .z21 = true, + .z22 = true, + .z23 = true, + .z24 = true, + .z25 = true, + .z26 = true, + .z27 = true, + .z28 = true, + .z29 = true, + .z30 = true, + .z31 = true, + .p0 = true, + .p1 = true, + .p2 = true, + .p3 = true, + .p4 = true, + .p5 = true, + .p6 = true, + .p7 = true, + .p8 = true, + .p9 = true, + .p10 = true, + .p11 = true, + .p12 = true, + .p13 = true, + .p14 = true, + .p15 = true, + .fpcr = true, + .fpsr = true, + .ffr = true, + .memory = true, + }), + .x86_64 => asm volatile ( + \\ movq 0(%%rsi), %%rax + \\ movq 8(%%rsi), %%rcx + \\ leaq 0f(%%rip), %%rdx + \\ movq %%rsp, 0(%%rax) + \\ movq %%rbp, 8(%%rax) + \\ movq %%rdx, 16(%%rax) + \\ movq 0(%%rcx), %%rsp + \\ movq 8(%%rcx), %%rbp + \\ jmpq *16(%%rcx) + \\0: + : [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")), + : [message_to_send] "{rsi}" (&message.contexts), + : .{ + .rax = true, + .rcx = true, + .rdx = true, + .rbx = true, + .rsi = true, + .rdi = true, + .r8 = true, + .r9 = true, + .r10 = true, + .r11 = true, + .r12 = true, + .r13 = true, + .r14 = true, + .r15 = true, + .mm0 = true, + .mm1 = true, + .mm2 = true, + .mm3 = true, + .mm4 = true, + .mm5 = true, + .mm6 = true, + .mm7 = true, + .zmm0 = true, + .zmm1 = true, + .zmm2 = true, + .zmm3 = true, + .zmm4 = true, + .zmm5 = true, + .zmm6 = true, + .zmm7 = true, + .zmm8 = true, + .zmm9 = true, + .zmm10 = true, + .zmm11 = true, + .zmm12 = true, + .zmm13 = true, + .zmm14 = true, + .zmm15 = true, + .zmm16 = true, + .zmm17 = true, + .zmm18 = true, + .zmm19 = true, + .zmm20 = true, + .zmm21 = true, + .zmm22 = true, + .zmm23 = true, + .zmm24 = true, + .zmm25 = true, + .zmm26 = true, + .zmm27 = true, + .zmm28 = true, + .zmm29 = true, + .zmm30 = true, + .zmm31 = true, + .fpsr = true, + .fpcr = true, + .mxcsr = true, + .rflags = true, + .dirflag = true, + .memory = true, + }), + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), + }); +} + +fn mainIdleEntry() callconv(.naked) void { + switch (builtin.cpu.arch) { + .x86_64 => asm volatile ( + \\ movq (%%rsp), %%rdi + \\ jmp %[mainIdle:P] + : + : [mainIdle] "X" (&mainIdle), + ), + .aarch64 => asm volatile ( + \\ ldr x0, [sp, #-8] + \\ b %[mainIdle] + : + : [mainIdle] "X" (&mainIdle), + ), + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), + } +} + +fn fiberEntry() callconv(.naked) void { + switch (builtin.cpu.arch) { + .x86_64 => asm volatile ( + \\ leaq 8(%%rsp), %%rdi + \\ jmp %[AsyncClosure_call:P] + : + : [AsyncClosure_call] "X" (&AsyncClosure.call), + ), + else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), + } +} + +const AsyncClosure = struct { + event_loop: *Kqueue, + fiber: *Fiber, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, + result_align: Alignment, + already_awaited: bool, + + fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 { + return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure)); + } + + fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn { + message.handle(closure.event_loop); + const fiber = closure.fiber; + std.log.debug("{*} performing async", .{fiber}); + closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align)); + const awaiter = @atomicRmw(?*Fiber, &fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); + const ready_awaiter = r: { + const a = awaiter orelse break :r null; + if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null; + break :r a; + }; + closure.event_loop.yield(ready_awaiter, .nothing); + unreachable; // switched to dead fiber + } + + fn fromFiber(fiber: *Fiber) *AsyncClosure { + return @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward( + @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size, + ) - @sizeOf(AsyncClosure)); + } +}; + pub fn io(k: *Kqueue) Io { return .{ .userdata = k, @@ -229,11 +1001,33 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = cond; - _ = mutex; - @panic("TODO"); + k.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } }); + const thread = Thread.current(); + const fiber = thread.currentFiber(); + const cond_impl = fiber.resultPointer(Condition); + try mutex.lock(k.io()); + switch (cond_impl.event) { + .queued => {}, + .wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) { + .one => if (@cmpxchgStrong( + ?*Fiber, + @as(*?*Fiber, @ptrCast(&cond.state)), + null, + next_fiber, + .release, + .acquire, + )) |old_fiber| { + const old_cond_impl = old_fiber.?.resultPointer(Condition); + assert(old_cond_impl.tail.queue_next == null); + old_cond_impl.tail.queue_next = next_fiber; + old_cond_impl.tail = cond_impl.tail; + }, + .all => k.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }), + }, + } + fiber.queue_next = null; } + fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void { const k: *Kqueue = @ptrCast(@alignCast(userdata)); _ = k; @@ -243,10 +1037,9 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: } fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void { const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = cond; - _ = wake; - @panic("TODO"); + const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return; + waiting_fiber.resultPointer(Condition).event = .{ .wake = wake }; + k.yield(waiting_fiber, .reschedule); } fn dirMake(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, mode: Dir.Mode) Dir.MakeError!void { @@ -426,7 +1219,7 @@ fn netBindIp( const k: *Kqueue = @ptrCast(@alignCast(userdata)); const family = Io.Threaded.posixAddressFamily(address); const socket_fd = try openSocketPosix(k, family, options); - errdefer posix.close(socket_fd); + errdefer std.posix.close(socket_fd); var storage: Io.Threaded.PosixAddress = undefined; var addr_len = Io.Threaded.addressToPosix(address, &storage); try posixBind(k, socket_fd, &storage.any, addr_len); @@ -704,3 +1497,11 @@ fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option fn checkCancel(k: *Kqueue) error{Canceled}!void { if (cancelRequested(k)) return error.Canceled; } + +const Condition = struct { + tail: *Fiber, + event: union(enum) { + queued, + wake: Io.Condition.Wake, + }, +}; |
