diff options
| author | Jacob Young <jacobly0@users.noreply.github.com> | 2025-03-30 01:54:02 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:48 -0700 |
| commit | 5041c9ad9cbf479e62416cd06ef8a178f3467127 (patch) | |
| tree | c147e83ef77b2e7174b82b209995075d9eb15ae0 /lib/std/Io/EventLoop.zig | |
| parent | e7caf3a54c24264734e4f464f0acbdb27b890893 (diff) | |
| download | zig-5041c9ad9cbf479e62416cd06ef8a178f3467127.tar.gz zig-5041c9ad9cbf479e62416cd06ef8a178f3467127.zip | |
EventLoop: implement thread-local queues and cancellation
Diffstat (limited to 'lib/std/Io/EventLoop.zig')
| -rw-r--r-- | lib/std/Io/EventLoop.zig | 638 |
1 files changed, 407 insertions, 231 deletions
diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index f4147b16ee..5de161d9f9 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -10,38 +10,50 @@ const IoUring = std.os.linux.IoUring; /// Must be a thread-safe allocator. gpa: Allocator, mutex: std.Thread.Mutex, -queue: std.DoublyLinkedList, -/// Atomic copy of queue.len -queue_len: u32, -free: std.DoublyLinkedList, main_fiber: Fiber, -idle_count: usize, -threads: std.ArrayListUnmanaged(Thread), -exiting: bool, - -threadlocal var thread_index: u32 = undefined; +threads: Thread.List, /// Empirically saw >128KB being used by the self-hosted backend to panic. const idle_stack_size = 256 * 1024; +const max_idle_search = 4; +const max_steal_ready_search = 4; + const io_uring_entries = 64; const Thread = struct { thread: std.Thread, idle_context: Context, current_context: *Context, + ready_queue: ?*Fiber, + free_queue: ?*Fiber, io_uring: IoUring, + idle_search_index: u32, + steal_ready_search_index: u32, + + threadlocal var index: u32 = undefined; + + fn current(el: *EventLoop) *Thread { + return &el.threads.allocated[index]; + } fn currentFiber(thread: *Thread) *Fiber { return @fieldParentPtr("context", thread.current_context); } + + const List = struct { + allocated: []Thread, + reserved: u32, + active: u32, + }; }; const Fiber = struct { context: Context, awaiter: ?*Fiber, - queue_node: std.DoublyLinkedList.Node, - result_align: Alignment, + queue_next: ?*Fiber, + can_cancel: bool, + canceled: bool, const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber))); @@ -63,14 +75,13 @@ const Fiber = struct { ); fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber { - return if (free_node: { - el.mutex.lock(); - defer el.mutex.unlock(); - break :free_node el.free.pop(); - }) |free_node| - @alignCast(@fieldParentPtr("queue_node", free_node)) - else - @ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), allocation_size)); + const thread: *Thread = .current(el); + if (thread.free_queue) |free_fiber| { + thread.free_queue = free_fiber.queue_next; + free_fiber.queue_next = null; + return free_fiber; + } + return @ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), allocation_size)); } fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 { @@ -82,9 +93,15 @@ const Fiber = struct { return allocated_slice[allocated_slice.len..].ptr; } - fn resultPointer(f: *Fiber) [*]u8 { - return @ptrFromInt(f.result_align.forward(@intFromPtr(f) + @sizeOf(Fiber))); + fn resultPointer(f: *Fiber, comptime Result: type) *Result { + return @alignCast(@ptrCast(f.resultBytes(.of(Result)))); + } + + fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 { + return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber))); } + + const Queue = struct { head: *Fiber, tail: *Fiber }; }; pub fn io(el: *EventLoop) Io { @@ -93,6 +110,8 @@ pub fn io(el: *EventLoop) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", + .cancel = cancel, + .cancelRequested = cancelRequested, .createFile = createFile, .openFile = openFile, .closeFile = closeFile, @@ -110,58 +129,86 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { el.* = .{ .gpa = gpa, .mutex = .{}, - .queue = .{}, - .queue_len = 0, - .free = .{}, - .main_fiber = undefined, - .idle_count = 0, - .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_size])), - .exiting = false, + .main_fiber = .{ + .context = undefined, + .awaiter = null, + .queue_next = null, + .can_cancel = false, + .canceled = false, + }, + .threads = .{ + .allocated = @ptrCast(allocated_slice[0..threads_size]), + .reserved = 1, + .active = 1, + }, }; - thread_index = 0; - const main_thread = el.threads.addOneAssumeCapacity(); - main_thread.io_uring = try IoUring.init(io_uring_entries, 0); + Thread.index = 0; + const main_thread = &el.threads.allocated[0]; const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; - main_thread.idle_context = .{ - .rsp = @intFromPtr(idle_stack_end - 1), - .rbp = 0, - .rip = @intFromPtr(&mainIdleEntry), + main_thread.* = .{ + .thread = undefined, + .idle_context = .{ + .rsp = @intFromPtr(idle_stack_end - 1), + .rbp = 0, + .rip = @intFromPtr(&mainIdleEntry), + }, + .current_context = &el.main_fiber.context, + .ready_queue = null, + .free_queue = null, + .io_uring = try IoUring.init(io_uring_entries, 0), + .idle_search_index = 1, + .steal_ready_search_index = 1, }; + errdefer main_thread.io_uring.deinit(); std.log.debug("created main idle {*}", .{&main_thread.idle_context}); std.log.debug("created main {*}", .{&el.main_fiber}); - main_thread.current_context = &el.main_fiber.context; } pub fn deinit(el: *EventLoop) void { - assert(el.queue.len == 0); // pending async + const active_threads = @atomicLoad(u32, &el.threads.active, .acquire); + for (el.threads.allocated[0..active_threads]) |*thread| + assert(@atomicLoad(?*Fiber, &thread.ready_queue, .unordered) == null); // pending async el.yield(null, .exit); - while (el.free.pop()) |free_node| { - const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node)); - el.gpa.free(free_fiber.allocatedSlice()); + const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr)); + const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); + for (el.threads.allocated[1..active_threads]) |*thread| { + thread.thread.join(); + while (thread.free_queue) |free_fiber| { + thread.free_queue = free_fiber.queue_next; + free_fiber.queue_next = null; + el.gpa.free(free_fiber.allocatedSlice()); + } } - const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); - const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.items.ptr)); - for (el.threads.items[1..]) |*thread| thread.thread.join(); el.gpa.free(allocated_ptr[0..idle_stack_end_offset]); el.* = undefined; } -fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void { - const thread: *Thread = &el.threads.items[thread_index]; - const ready_context: *Context = ready_context: { - const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: { - el.mutex.lock(); - defer el.mutex.unlock(); - const expected_queue_len = std.math.lossyCast(u32, el.queue.len); - const ready_node = el.queue.pop(); - _ = @cmpxchgStrong(u32, &el.queue_len, expected_queue_len, std.math.lossyCast(u32, el.queue.len), .monotonic, .monotonic); - break :ready_node ready_node; - }) |ready_node| - @alignCast(@fieldParentPtr("queue_node", ready_node)) - else - break :ready_context &thread.idle_context; +fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void { + const thread: *Thread = .current(el); + const ready_context: *Context = if (maybe_ready_fiber) |ready_fiber| + &ready_fiber.context + else if (thread.ready_queue) |ready_fiber| ready_context: { + thread.ready_queue = ready_fiber.queue_next; + ready_fiber.queue_next = null; break :ready_context &ready_fiber.context; + } else ready_context: { + const ready_threads = @atomicLoad(u32, &el.threads.active, .acquire); + break :ready_context for (0..max_steal_ready_search) |_| { + defer thread.steal_ready_search_index += 1; + if (thread.steal_ready_search_index == ready_threads) thread.steal_ready_search_index = 0; + const steal_ready_search_thread = &el.threads.allocated[thread.steal_ready_search_index]; + const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue; + if (@cmpxchgWeak( + ?*Fiber, + &steal_ready_search_thread.ready_queue, + ready_fiber, + @atomicLoad(?*Fiber, &ready_fiber.queue_next, .acquire), + .acq_rel, + .monotonic, + )) |_| continue; + break &ready_fiber.context; + } else &thread.idle_context; }; const message: SwitchMessage = .{ .contexts = .{ @@ -174,111 +221,177 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.Pe contextSwitch(&message).handle(el); } -fn schedule(el: *EventLoop, fiber: *Fiber) void { - std.log.debug("scheduling {*}", .{fiber}); - if (idle_count: { - el.mutex.lock(); - defer el.mutex.unlock(); - const expected_queue_len = std.math.lossyCast(u32, el.queue.len); - el.queue.append(&fiber.queue_node); - _ = @cmpxchgStrong(u32, &el.queue_len, expected_queue_len, std.math.lossyCast(u32, el.queue.len), .monotonic, .monotonic); - break :idle_count el.idle_count; - } > 0) { - _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(u32), 1, std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring +fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { + { + var fiber = ready_queue.head; + while (true) { + std.log.debug("scheduling {*}", .{fiber}); + fiber = fiber.queue_next orelse break; + } + assert(fiber == ready_queue.tail); + } + // shared fields of previous `Thread` must be initialized before later ones are marked as active + const new_thread_index = @atomicLoad(u32, &el.threads.active, .acquire); + for (0..max_idle_search) |_| { + defer thread.idle_search_index += 1; + if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0; + const idle_search_thread = &el.threads.allocated[thread.idle_search_index]; + if (@cmpxchgWeak( + ?*Fiber, + &idle_search_thread.ready_queue, + null, + ready_queue.head, + .acq_rel, + .monotonic, + )) |_| continue; + getSqe(&thread.io_uring).* = .{ + .opcode = .MSG_RING, + .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, + .ioprio = 0, + .fd = idle_search_thread.io_uring.fd, + .off = @intFromEnum(Completion.Key.wakeup), + .addr = 0, + .len = 0, + .rw_flags = 0, + .user_data = @intFromEnum(Completion.Key.wakeup), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; return; } - if (el.threads.items.len == el.threads.capacity) return; - const thread = el.threads.addOneAssumeCapacity(); - thread.thread = std.Thread.spawn(.{ - .stack_size = idle_stack_size, - .allocator = el.gpa, - }, threadEntry, .{ el, el.threads.items.len - 1 }) catch { - el.threads.items.len -= 1; + spawn_thread: { + // previous failed reservations must have completed before retrying + if (new_thread_index == el.threads.allocated.len or @cmpxchgWeak( + u32, + &el.threads.reserved, + new_thread_index, + new_thread_index + 1, + .acquire, + .monotonic, + ) != null) break :spawn_thread; + const new_thread = &el.threads.allocated[new_thread_index]; + const next_thread_index = new_thread_index + 1; + new_thread.* = .{ + .thread = undefined, + .idle_context = undefined, + .current_context = &new_thread.idle_context, + .ready_queue = ready_queue.head, + .free_queue = null, + .io_uring = IoUring.init(io_uring_entries, 0) catch |err| { + @atomicStore(u32, &el.threads.reserved, new_thread_index, .release); + // no more access to `thread` after giving up reservation + std.log.warn("unable to create worker thread due to io_uring init failure: {s}", .{@errorName(err)}); + break :spawn_thread; + }, + .idle_search_index = next_thread_index, + .steal_ready_search_index = next_thread_index, + }; + new_thread.thread = std.Thread.spawn(.{ + .stack_size = idle_stack_size, + .allocator = el.gpa, + }, threadEntry, .{ el, new_thread_index }) catch |err| { + new_thread.io_uring.deinit(); + @atomicStore(u32, &el.threads.reserved, new_thread_index, .release); + // no more access to `thread` after giving up reservation + std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)}); + break :spawn_thread; + }; + // shared fields of `Thread` must be initialized before being marked active + @atomicStore(u32, &el.threads.active, next_thread_index, .release); return; - }; + } + // nobody wanted it, so just queue it on ourselves + while (@cmpxchgWeak( + ?*Fiber, + &thread.ready_queue, + ready_queue.tail.queue_next, + ready_queue.head, + .acq_rel, + .acquire, + )) |old_head| ready_queue.tail.queue_next = old_head; } fn recycle(el: *EventLoop, fiber: *Fiber) void { + const thread: *Thread = .current(el); std.log.debug("recyling {*}", .{fiber}); + assert(fiber.queue_next == null); @memset(fiber.allocatedSlice(), undefined); - el.mutex.lock(); - defer el.mutex.unlock(); - el.free.append(&fiber.queue_node); + fiber.queue_next = thread.free_queue; + thread.free_queue = fiber; } fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { message.handle(el); - el.idle(); + const thread: *Thread = &el.threads.allocated[0]; + el.idle(thread); el.yield(&el.main_fiber, .nothing); unreachable; // switched to dead fiber } -fn threadEntry(el: *EventLoop, index: usize) void { - thread_index = @intCast(index); - const thread: *Thread = &el.threads.items[index]; +fn threadEntry(el: *EventLoop, index: u32) void { + Thread.index = index; + const thread: *Thread = &el.threads.allocated[index]; std.log.debug("created thread idle {*}", .{&thread.idle_context}); - thread.io_uring = IoUring.init(io_uring_entries, 0) catch |err| { - std.log.warn("exiting worker thread during init due to io_uring init failure: {s}", .{@errorName(err)}); - return; - }; - thread.current_context = &thread.idle_context; - el.idle(); + el.idle(thread); } -const CompletionKey = enum(u64) { - queue_len_futex_wait = 1, - _, +const Completion = struct { + const Key = enum(usize) { + unused, + wakeup, + cancel, + cleanup, + exit, + /// *Fiber + _, + }; + result: i32, + flags: u32, }; -fn idle(el: *EventLoop) void { - const thread: *Thread = &el.threads.items[thread_index]; - const iou = &thread.io_uring; - var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined; - var queue_len_futex_is_scheduled: bool = false; - +fn idle(el: *EventLoop, thread: *Thread) void { + var maybe_ready_fiber: ?*Fiber = null; while (true) { - el.yield(null, .nothing); - if (@atomicLoad(bool, &el.exiting, .acquire)) return; - if (!queue_len_futex_is_scheduled) { - const sqe = getSqe(&thread.io_uring); - sqe.prep_rw(.FUTEX_WAIT, std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0); - sqe.addr3 = std.math.maxInt(u32); - sqe.user_data = @intFromEnum(CompletionKey.queue_len_futex_wait); - queue_len_futex_is_scheduled = true; - } - _ = iou.submit_and_wait(1) catch |err| switch (err) { - error.SignalInterrupt => std.log.debug("submit_and_wait: SignalInterrupt", .{}), - else => @panic(@errorName(err)), + el.yield(maybe_ready_fiber, .nothing); + maybe_ready_fiber = null; + _ = thread.io_uring.submit_and_wait(1) catch |err| switch (err) { + error.SignalInterrupt => std.log.warn("submit_and_wait failed with SignalInterrupt", .{}), + else => |e| @panic(@errorName(e)), }; - for (cqes_buffer[0 .. iou.copy_cqes(&cqes_buffer, 1) catch |err| switch (err) { + var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined; + var maybe_ready_queue: ?Fiber.Queue = null; + for (cqes_buffer[0 .. thread.io_uring.copy_cqes(&cqes_buffer, 0) catch |err| switch (err) { error.SignalInterrupt => cqes_len: { - std.log.debug("copy_cqes: SignalInterrupt", .{}); + std.log.warn("copy_cqes failed with SignalInterrupt", .{}); break :cqes_len 0; }, - else => @panic(@errorName(err)), - }]) |cqe| switch (@as(CompletionKey, @enumFromInt(cqe.user_data))) { - .queue_len_futex_wait => { - switch (errno(cqe.res)) { - .SUCCESS, .AGAIN => {}, - .INVAL => unreachable, - else => |err| { - std.posix.unexpectedErrno(err) catch {}; - @panic("unexpected"); - }, - } - std.log.debug("{*} woken up with queue size of {d}", .{ - &thread.idle_context, - @atomicLoad(u32, &el.queue_len, .unordered), - }); - queue_len_futex_is_scheduled = false; + else => |e| @panic(@errorName(e)), + }]) |cqe| switch (@as(Completion.Key, @enumFromInt(cqe.user_data))) { + .unused => unreachable, // bad submission queued? + .wakeup => {}, + .cancel => {}, + .cleanup => @panic("failed to notify other threads that we are exiting"), + .exit => { + assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async + return; }, _ => { const fiber: *Fiber = @ptrFromInt(cqe.user_data); - const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer())); - res.* = cqe.res; - el.schedule(fiber); + assert(fiber.queue_next == null); + fiber.resultPointer(Completion).* = .{ + .result = cqe.res, + .flags = cqe.flags, + }; + if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| { + ready_queue.tail.queue_next = fiber; + ready_queue.tail = fiber; + } else maybe_ready_queue = .{ .head = fiber, .tail = fiber }; }, }; + if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue); } } @@ -296,18 +409,37 @@ const SwitchMessage = struct { }; fn handle(message: *const SwitchMessage, el: *EventLoop) void { - const thread: *Thread = &el.threads.items[thread_index]; + const thread: *Thread = .current(el); thread.current_context = message.contexts.ready; switch (message.pending_task) { .nothing => {}, .register_awaiter => |awaiter| { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); - if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); + if (@atomicRmw( + ?*Fiber, + awaiter, + .Xchg, + prev_fiber, + .acq_rel, + ) == Fiber.finished) el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); }, - .exit => { - @atomicStore(bool, &el.exiting, true, .unordered); - @atomicStore(u32, &el.queue_len, std.math.maxInt(u32), .release); - _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(u32), std.math.maxInt(i32), std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring + .exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| { + getSqe(&thread.io_uring).* = .{ + .opcode = .MSG_RING, + .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, + .ioprio = 0, + .fd = each_thread.io_uring.fd, + .off = @intFromEnum(Completion.Key.exit), + .addr = 0, + .len = 0, + .rw_flags = 0, + .user_data = @intFromEnum(Completion.Key.cleanup), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; }, } } @@ -374,7 +506,27 @@ fn fiberEntry() callconv(.naked) void { } } -pub fn @"async"( +const AsyncClosure = struct { + event_loop: *EventLoop, + fiber: *Fiber, + start: *const fn (context: *const anyopaque, result: *anyopaque) void, + result_align: Alignment, + + fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 { + return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure)); + } + + fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn { + message.handle(closure.event_loop); + std.log.debug("{*} performing async", .{closure.fiber}); + closure.start(closure.contextPointer(), closure.fiber.resultBytes(closure.result_align)); + const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); + closure.event_loop.yield(awaiter, .nothing); + unreachable; // switched to dead fiber + } +}; + +fn @"async"( userdata: ?*anyopaque, result: []u8, result_alignment: Alignment, @@ -407,58 +559,79 @@ pub fn @"async"( else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)), }, .awaiter = null, - .queue_node = undefined, - .result_align = result_alignment, + .queue_next = null, + .can_cancel = false, + .canceled = false, }; closure.* = .{ .event_loop = event_loop, .fiber = fiber, .start = start, + .result_align = result_alignment, }; @memcpy(closure.contextPointer(), context); - event_loop.schedule(fiber); + event_loop.schedule(.current(event_loop), .{ .head = fiber, .tail = fiber }); return @ptrCast(fiber); } -const AsyncClosure = struct { - event_loop: *EventLoop, - fiber: *Fiber, - start: *const fn (context: *const anyopaque, result: *anyopaque) void, - - fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 { - return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure)); - } - - fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn { - message.handle(closure.event_loop); - std.log.debug("{*} performing async", .{closure.fiber}); - closure.start(closure.contextPointer(), closure.fiber.resultPointer()); - const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); - closure.event_loop.yield(awaiter, .nothing); - unreachable; // switched to dead fiber - } -}; - -pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void { +fn @"await"( + userdata: ?*anyopaque, + any_future: *std.Io.AnyFuture, + result: []u8, + result_alignment: Alignment, +) void { const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); - @memcpy(result, future_fiber.resultPointer()); + @memcpy(result, future_fiber.resultBytes(result_alignment)); event_loop.recycle(future_fiber); } -pub fn cancel(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void { +fn cancel( + userdata: ?*anyopaque, + any_future: *std.Io.AnyFuture, + result: []u8, + result_alignment: Alignment, +) void { const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); - // TODO set a flag that makes all IO operations for this fiber return error.Canceled - if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); - @memcpy(result, future_fiber.resultPointer()); - event_loop.recycle(future_fiber); + @atomicStore(bool, &future_fiber.canceled, true, .release); + if (@atomicLoad(bool, &future_fiber.can_cancel, .acquire)) { + const thread: *Thread = .current(event_loop); + getSqe(&thread.io_uring).* = .{ + .opcode = .ASYNC_CANCEL, + .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, + .ioprio = 0, + .fd = 0, + .off = 0, + .addr = @intFromPtr(future_fiber), + .len = 0, + .rw_flags = 0, + .user_data = @intFromEnum(Completion.Key.cancel), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; + } + @"await"(userdata, any_future, result, result_alignment); +} + +fn cancelRequested(userdata: ?*anyopaque) bool { + const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); + const thread: *Thread = .current(event_loop); + return thread.currentFiber().canceled; } -pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.CreateFlags) std.fs.File.OpenError!std.fs.File { - const el: *EventLoop = @ptrCast(@alignCast(userdata)); +pub fn createFile( + userdata: ?*anyopaque, + dir: std.fs.Dir, + sub_path: []const u8, + flags: Io.CreateFlags, +) Io.FileOpenError!std.fs.File { + const el: *EventLoop = @alignCast(@ptrCast(userdata)); const posix = std.posix; const sub_path_c = try posix.toPosixPath(sub_path); @@ -497,22 +670,24 @@ pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, @panic("TODO"); } - const thread: *Thread = &el.threads.items[thread_index]; + const thread: *Thread = .current(el); const iou = &thread.io_uring; - const sqe = getSqe(iou); const fiber = thread.currentFiber(); + if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; + const sqe = getSqe(iou); sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode); sqe.user_data = @intFromPtr(fiber); + @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); + @atomicStore(bool, &fiber.can_cancel, false, .release); - const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); - const rc = result.*; - switch (errno(rc)) { - .SUCCESS => return .{ .handle = rc }, + const completion = fiber.resultPointer(Completion); + switch (errno(completion.result)) { + .SUCCESS => return .{ .handle = completion.result }, .INTR => @panic("TODO is this reachable?"), - .CANCELED => @panic("TODO figure out how this error code fits into things"), + .CANCELED => return error.AsyncCancel, .FAULT => unreachable, .INVAL => return error.BadPathName, @@ -541,8 +716,17 @@ pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, } } -pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.OpenFlags) std.fs.File.OpenError!std.fs.File { - const el: *EventLoop = @ptrCast(@alignCast(userdata)); +pub fn openFile( + userdata: ?*anyopaque, + dir: std.fs.Dir, + sub_path: []const u8, + flags: Io.OpenFlags, +) Io.FileOpenError!std.fs.File { + const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const thread: *Thread = .current(el); + const iou = &thread.io_uring; + const fiber = thread.currentFiber(); + if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; const posix = std.posix; const sub_path_c = try posix.toPosixPath(sub_path); @@ -587,22 +771,19 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl @panic("TODO"); } - const thread: *Thread = &el.threads.items[thread_index]; - const iou = &thread.io_uring; const sqe = getSqe(iou); - const fiber = thread.currentFiber(); - sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0); sqe.user_data = @intFromPtr(fiber); + @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); + @atomicStore(bool, &fiber.can_cancel, false, .release); - const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); - const rc = result.*; - switch (errno(rc)) { - .SUCCESS => return .{ .handle = rc }, + const completion = fiber.resultPointer(Completion); + switch (errno(completion.result)) { + .SUCCESS => return .{ .handle = completion.result }, .INTR => @panic("TODO is this reachable?"), - .CANCELED => @panic("TODO figure out how this error code fits into things"), + .CANCELED => return error.AsyncCancel, .FAULT => unreachable, .INVAL => return error.BadPathName, @@ -631,63 +812,49 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl } } -fn errno(signed: i32) std.posix.E { - const int = if (signed > -4096 and signed < 0) -signed else 0; - return @enumFromInt(int); -} - -fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe { - return iou.get_sqe() catch @panic("TODO: handle submission queue full"); -} - pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { - const el: *EventLoop = @ptrCast(@alignCast(userdata)); - - const posix = std.posix; - - const thread: *Thread = &el.threads.items[thread_index]; + const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const thread: *Thread = .current(el); const iou = &thread.io_uring; - const sqe = getSqe(iou); const fiber = thread.currentFiber(); + const sqe = getSqe(iou); sqe.prep_close(file.handle); sqe.user_data = @intFromPtr(fiber); el.yield(null, .nothing); - const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); - const rc = result.*; - switch (errno(rc)) { + const completion = fiber.resultPointer(Completion); + switch (errno(completion.result)) { .SUCCESS => return, .INTR => @panic("TODO is this reachable?"), - .CANCELED => @panic("TODO figure out how this error code fits into things"), + .CANCELED => return, .BADF => unreachable, // Always a race condition. else => return, } } -pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File.ReadError!usize { - const el: *EventLoop = @ptrCast(@alignCast(userdata)); - - const posix = std.posix; - - const thread: *Thread = &el.threads.items[thread_index]; +pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize { + const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const thread: *Thread = .current(el); const iou = &thread.io_uring; - const sqe = getSqe(iou); const fiber = thread.currentFiber(); + if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; + const sqe = getSqe(iou); sqe.prep_read(file.handle, buffer, std.math.maxInt(u64)); sqe.user_data = @intFromPtr(fiber); + @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); + @atomicStore(bool, &fiber.can_cancel, false, .release); - const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); - const rc = result.*; - switch (errno(rc)) { - .SUCCESS => return @as(u32, @bitCast(rc)), + const completion = fiber.resultPointer(Completion); + switch (errno(completion.result)) { + .SUCCESS => return @as(u32, @bitCast(completion.result)), .INTR => @panic("TODO is this reachable?"), - .CANCELED => @panic("TODO figure out how this error code fits into things"), + .CANCELED => return error.AsyncCancel, .INVAL => unreachable, .FAULT => unreachable, @@ -701,31 +868,31 @@ pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File. .NOTCONN => return error.SocketNotConnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, - else => |err| return posix.unexpectedErrno(err), + else => |err| return std.posix.unexpectedErrno(err), } } -pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.fs.File.WriteError!usize { - const el: *EventLoop = @ptrCast(@alignCast(userdata)); - - const posix = std.posix; +pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize { + const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = &el.threads.items[thread_index]; + const thread: *Thread = .current(el); const iou = &thread.io_uring; - const sqe = getSqe(iou); const fiber = thread.currentFiber(); + if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; + const sqe = getSqe(iou); sqe.prep_write(file.handle, buffer, std.math.maxInt(u64)); sqe.user_data = @intFromPtr(fiber); + @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); + @atomicStore(bool, &fiber.can_cancel, false, .release); - const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); - const rc = result.*; - switch (errno(rc)) { - .SUCCESS => return @as(u32, @bitCast(rc)), + const completion = fiber.resultPointer(Completion); + switch (errno(completion.result)) { + .SUCCESS => return @as(u32, @bitCast(completion.result)), .INTR => @panic("TODO is this reachable?"), - .CANCELED => @panic("TODO figure out how this error code fits into things"), + .CANCELED => return error.AsyncCancel, .INVAL => return error.InvalidArgument, .FAULT => unreachable, @@ -744,6 +911,15 @@ pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.f .BUSY => return error.DeviceBusy, .NXIO => return error.NoDevice, .MSGSIZE => return error.MessageTooBig, - else => |err| return posix.unexpectedErrno(err), + else => |err| return std.posix.unexpectedErrno(err), } } + +fn errno(signed: i32) std.posix.E { + const int = if (signed > -4096 and signed < 0) -signed else 0; + return @enumFromInt(int); +} + +fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe { + return iou.get_sqe() catch @panic("TODO: handle submission queue full"); +} |
