diff options
| author | Jacob Young <jacobly0@users.noreply.github.com> | 2025-03-30 15:13:41 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:48 -0700 |
| commit | 08b609a79ff151e747c6b860c90b634daaa68f1c (patch) | |
| tree | 2ac9a3ac308bd53125ef1770e9002c8274f2e734 /lib/std/Io/EventLoop.zig | |
| parent | 5041c9ad9cbf479e62416cd06ef8a178f3467127 (diff) | |
| download | zig-08b609a79ff151e747c6b860c90b634daaa68f1c.tar.gz zig-08b609a79ff151e747c6b860c90b634daaa68f1c.zip | |
Io: implement sleep and fix cancel bugs
Diffstat (limited to 'lib/std/Io/EventLoop.zig')
| -rw-r--r-- | lib/std/Io/EventLoop.zig | 412 |
1 files changed, 291 insertions, 121 deletions
diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 5de161d9f9..a24d5173e2 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -31,10 +31,12 @@ const Thread = struct { idle_search_index: u32, steal_ready_search_index: u32, - threadlocal var index: u32 = undefined; + const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread)); - fn current(el: *EventLoop) *Thread { - return &el.threads.allocated[index]; + threadlocal var self: *Thread = undefined; + + fn current() *Thread { + return self; } fn currentFiber(thread: *Thread) *Fiber { @@ -52,10 +54,9 @@ const Fiber = struct { context: Context, awaiter: ?*Fiber, queue_next: ?*Fiber, - can_cancel: bool, - canceled: bool, + cancel_thread: ?*Thread, - const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber))); + const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread)); const max_result_align: Alignment = .@"16"; const max_result_size = max_result_align.forward(64); @@ -75,7 +76,7 @@ const Fiber = struct { ); fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber { - const thread: *Thread = .current(el); + const thread: *Thread = .current(); if (thread.free_queue) |free_fiber| { thread.free_queue = free_fiber.queue_next; free_fiber.queue_next = null; @@ -101,6 +102,40 @@ const Fiber = struct { return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber))); } + fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{AsyncCancel}!void { + if (@cmpxchgStrong( + ?*Thread, + &fiber.cancel_thread, + null, + thread, + .acq_rel, + .acquire, + )) |cancel_thread| { + assert(cancel_thread == Thread.canceling); + return error.AsyncCancel; + } + } + + fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void { + if (@cmpxchgStrong( + ?*Thread, + &fiber.cancel_thread, + thread, + null, + .acq_rel, + .acquire, + )) |cancel_thread| assert(cancel_thread == Thread.canceling); + } + + fn recycle(fiber: *Fiber) void { + const thread: *Thread = .current(); + std.log.debug("recyling {*}", .{fiber}); + assert(fiber.queue_next == null); + @memset(fiber.allocatedSlice(), undefined); + fiber.queue_next = thread.free_queue; + thread.free_queue = fiber; + } + const Queue = struct { head: *Fiber, tail: *Fiber }; }; @@ -110,13 +145,18 @@ pub fn io(el: *EventLoop) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", + .cancel = cancel, .cancelRequested = cancelRequested, + .createFile = createFile, .openFile = openFile, .closeFile = closeFile, - .read = read, - .write = write, + .pread = pread, + .pwrite = pwrite, + + .now = now, + .sleep = sleep, }, }; } @@ -133,8 +173,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { .context = undefined, .awaiter = null, .queue_next = null, - .can_cancel = false, - .canceled = false, + .cancel_thread = null, }, .threads = .{ .allocated = @ptrCast(allocated_slice[0..threads_size]), @@ -142,8 +181,8 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { .active = 1, }, }; - Thread.index = 0; const main_thread = &el.threads.allocated[0]; + Thread.self = main_thread; const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; main_thread.* = .{ @@ -168,24 +207,22 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void { pub fn deinit(el: *EventLoop) void { const active_threads = @atomicLoad(u32, &el.threads.active, .acquire); for (el.threads.allocated[0..active_threads]) |*thread| - assert(@atomicLoad(?*Fiber, &thread.ready_queue, .unordered) == null); // pending async + assert(@atomicLoad(?*Fiber, &thread.ready_queue, .acquire) == null); // pending async el.yield(null, .exit); + for (el.threads.allocated[0..active_threads]) |*thread| while (thread.free_queue) |free_fiber| { + thread.free_queue = free_fiber.queue_next; + free_fiber.queue_next = null; + el.gpa.free(free_fiber.allocatedSlice()); + }; const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr)); const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); - for (el.threads.allocated[1..active_threads]) |*thread| { - thread.thread.join(); - while (thread.free_queue) |free_fiber| { - thread.free_queue = free_fiber.queue_next; - free_fiber.queue_next = null; - el.gpa.free(free_fiber.allocatedSlice()); - } - } + for (el.threads.allocated[1..active_threads]) |thread| thread.thread.join(); el.gpa.free(allocated_ptr[0..idle_stack_end_offset]); el.* = undefined; } fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void { - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const ready_context: *Context = if (maybe_ready_fiber) |ready_fiber| &ready_fiber.context else if (thread.ready_queue) |ready_fiber| ready_context: { @@ -198,6 +235,7 @@ fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage defer thread.steal_ready_search_index += 1; if (thread.steal_ready_search_index == ready_threads) thread.steal_ready_search_index = 0; const steal_ready_search_thread = &el.threads.allocated[thread.steal_ready_search_index]; + if (steal_ready_search_thread == thread) continue; const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue; if (@cmpxchgWeak( ?*Fiber, @@ -236,6 +274,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { defer thread.idle_search_index += 1; if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0; const idle_search_thread = &el.threads.allocated[thread.idle_search_index]; + if (idle_search_thread == thread) continue; if (@cmpxchgWeak( ?*Fiber, &idle_search_thread.ready_queue, @@ -249,11 +288,11 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, .ioprio = 0, .fd = idle_search_thread.io_uring.fd, - .off = @intFromEnum(Completion.Key.wakeup), + .off = @intFromEnum(Completion.UserData.wakeup), .addr = 0, .len = 0, .rw_flags = 0, - .user_data = @intFromEnum(Completion.Key.wakeup), + .user_data = @intFromEnum(Completion.UserData.wakeup), .buf_index = 0, .personality = 0, .splice_fd_in = 0, @@ -314,15 +353,6 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void { )) |old_head| ready_queue.tail.queue_next = old_head; } -fn recycle(el: *EventLoop, fiber: *Fiber) void { - const thread: *Thread = .current(el); - std.log.debug("recyling {*}", .{fiber}); - assert(fiber.queue_next == null); - @memset(fiber.allocatedSlice(), undefined); - fiber.queue_next = thread.free_queue; - thread.free_queue = fiber; -} - fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn { message.handle(el); const thread: *Thread = &el.threads.allocated[0]; @@ -332,17 +362,16 @@ fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAl } fn threadEntry(el: *EventLoop, index: u32) void { - Thread.index = index; const thread: *Thread = &el.threads.allocated[index]; + Thread.self = thread; std.log.debug("created thread idle {*}", .{&thread.idle_context}); el.idle(thread); } const Completion = struct { - const Key = enum(usize) { + const UserData = enum(usize) { unused, wakeup, - cancel, cleanup, exit, /// *Fiber @@ -369,26 +398,43 @@ fn idle(el: *EventLoop, thread: *Thread) void { break :cqes_len 0; }, else => |e| @panic(@errorName(e)), - }]) |cqe| switch (@as(Completion.Key, @enumFromInt(cqe.user_data))) { + }]) |cqe| switch (@as(Completion.UserData, @enumFromInt(cqe.user_data))) { .unused => unreachable, // bad submission queued? .wakeup => {}, - .cancel => {}, .cleanup => @panic("failed to notify other threads that we are exiting"), .exit => { assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async return; }, - _ => { - const fiber: *Fiber = @ptrFromInt(cqe.user_data); - assert(fiber.queue_next == null); - fiber.resultPointer(Completion).* = .{ - .result = cqe.res, - .flags = cqe.flags, - }; - if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| { - ready_queue.tail.queue_next = fiber; - ready_queue.tail = fiber; - } else maybe_ready_queue = .{ .head = fiber, .tail = fiber }; + _ => switch (errno(cqe.res)) { + .INTR => getSqe(&thread.io_uring).* = .{ + .opcode = .ASYNC_CANCEL, + .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, + .ioprio = 0, + .fd = 0, + .off = 0, + .addr = cqe.user_data, + .len = 0, + .rw_flags = 0, + .user_data = @intFromEnum(Completion.UserData.wakeup), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }, + else => { + const fiber: *Fiber = @ptrFromInt(cqe.user_data); + assert(fiber.queue_next == null); + fiber.resultPointer(Completion).* = .{ + .result = cqe.res, + .flags = cqe.flags, + }; + if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| { + ready_queue.tail.queue_next = fiber; + ready_queue.tail = fiber; + } else maybe_ready_queue = .{ .head = fiber, .tail = fiber }; + }, }, }; if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue); @@ -409,7 +455,7 @@ const SwitchMessage = struct { }; fn handle(message: *const SwitchMessage, el: *EventLoop) void { - const thread: *Thread = .current(el); + const thread: *Thread = .current(); thread.current_context = message.contexts.ready; switch (message.pending_task) { .nothing => {}, @@ -429,11 +475,11 @@ const SwitchMessage = struct { .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, .ioprio = 0, .fd = each_thread.io_uring.fd, - .off = @intFromEnum(Completion.Key.exit), + .off = @intFromEnum(Completion.UserData.exit), .addr = 0, .len = 0, .rw_flags = 0, - .user_data = @intFromEnum(Completion.Key.cleanup), + .user_data = @intFromEnum(Completion.UserData.cleanup), .buf_index = 0, .personality = 0, .splice_fd_in = 0, @@ -544,6 +590,7 @@ fn @"async"( start(context.ptr, result.ptr); return null; }; + errdefer fiber.recycle(); std.log.debug("allocated {*}", .{fiber}); const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward( @@ -560,8 +607,7 @@ fn @"async"( }, .awaiter = null, .queue_next = null, - .can_cancel = false, - .canceled = false, + .cancel_thread = null, }; closure.* = .{ .event_loop = event_loop, @@ -571,7 +617,7 @@ fn @"async"( }; @memcpy(closure.contextPointer(), context); - event_loop.schedule(.current(event_loop), .{ .head = fiber, .tail = fiber }); + event_loop.schedule(.current(), .{ .head = fiber, .tail = fiber }); return @ptrCast(fiber); } @@ -585,7 +631,7 @@ fn @"await"( const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter }); @memcpy(result, future_fiber.resultBytes(result_alignment)); - event_loop.recycle(future_fiber); + future_fiber.recycle(); } fn cancel( @@ -594,35 +640,37 @@ fn cancel( result: []u8, result_alignment: Alignment, ) void { - const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); - @atomicStore(bool, &future_fiber.canceled, true, .release); - if (@atomicLoad(bool, &future_fiber.can_cancel, .acquire)) { - const thread: *Thread = .current(event_loop); - getSqe(&thread.io_uring).* = .{ - .opcode = .ASYNC_CANCEL, + if (@atomicRmw( + ?*Thread, + &future_fiber.cancel_thread, + .Xchg, + Thread.canceling, + .acq_rel, + )) |cancel_thread| if (cancel_thread != Thread.canceling) { + getSqe(&Thread.current().io_uring).* = .{ + .opcode = .MSG_RING, .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS, .ioprio = 0, - .fd = 0, - .off = 0, - .addr = @intFromPtr(future_fiber), - .len = 0, + .fd = cancel_thread.io_uring.fd, + .off = @intFromPtr(future_fiber), + .addr = 0, + .len = @bitCast(-@as(i32, @intFromEnum(std.os.linux.E.INTR))), .rw_flags = 0, - .user_data = @intFromEnum(Completion.Key.cancel), + .user_data = @intFromEnum(Completion.UserData.cleanup), .buf_index = 0, .personality = 0, .splice_fd_in = 0, .addr3 = 0, .resv = 0, }; - } + }; @"await"(userdata, any_future, result, result_alignment); } fn cancelRequested(userdata: ?*anyopaque) bool { - const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = .current(event_loop); - return thread.currentFiber().canceled; + _ = userdata; + return @atomicLoad(?*Thread, &Thread.current().currentFiber().cancel_thread, .acquire) == Thread.canceling; } pub fn createFile( @@ -632,6 +680,10 @@ pub fn createFile( flags: Io.CreateFlags, ) Io.FileOpenError!std.fs.File { const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const thread: *Thread = .current(); + const iou = &thread.io_uring; + const fiber = thread.currentFiber(); + try fiber.enterCancelRegion(thread); const posix = std.posix; const sub_path_c = try posix.toPosixPath(sub_path); @@ -670,23 +722,30 @@ pub fn createFile( @panic("TODO"); } - const thread: *Thread = .current(el); - const iou = &thread.io_uring; - const fiber = thread.currentFiber(); - if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; - - const sqe = getSqe(iou); - sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode); - sqe.user_data = @intFromPtr(fiber); + getSqe(iou).* = .{ + .opcode = .OPENAT, + .flags = 0, + .ioprio = 0, + .fd = dir.fd, + .off = 0, + .addr = @intFromPtr(&sub_path_c), + .len = @intCast(flags.mode), + .rw_flags = @bitCast(os_flags), + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; - @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); - @atomicStore(bool, &fiber.can_cancel, false, .release); + fiber.exitCancelRegion(thread); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return .{ .handle = completion.result }, - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return error.AsyncCancel, .FAULT => unreachable, @@ -723,10 +782,10 @@ pub fn openFile( flags: Io.OpenFlags, ) Io.FileOpenError!std.fs.File { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const iou = &thread.io_uring; const fiber = thread.currentFiber(); - if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; + try fiber.enterCancelRegion(thread); const posix = std.posix; const sub_path_c = try posix.toPosixPath(sub_path); @@ -771,18 +830,30 @@ pub fn openFile( @panic("TODO"); } - const sqe = getSqe(iou); - sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0); - sqe.user_data = @intFromPtr(fiber); + getSqe(iou).* = .{ + .opcode = .OPENAT, + .flags = 0, + .ioprio = 0, + .fd = dir.fd, + .off = 0, + .addr = @intFromPtr(&sub_path_c), + .len = 0, + .rw_flags = @bitCast(os_flags), + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; - @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); - @atomicStore(bool, &fiber.can_cancel, false, .release); + fiber.exitCancelRegion(thread); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return .{ .handle = completion.result }, - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return error.AsyncCancel, .FAULT => unreachable, @@ -814,20 +885,33 @@ pub fn openFile( pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const iou = &thread.io_uring; const fiber = thread.currentFiber(); - const sqe = getSqe(iou); - sqe.prep_close(file.handle); - sqe.user_data = @intFromPtr(fiber); + getSqe(iou).* = .{ + .opcode = .CLOSE, + .flags = 0, + .ioprio = 0, + .fd = file.handle, + .off = 0, + .addr = 0, + .len = 0, + .rw_flags = 0, + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; el.yield(null, .nothing); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return, - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return, .BADF => unreachable, // Always a race condition. @@ -835,25 +919,37 @@ pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { } } -pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize { +pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const iou = &thread.io_uring; const fiber = thread.currentFiber(); - if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; - - const sqe = getSqe(iou); - sqe.prep_read(file.handle, buffer, std.math.maxInt(u64)); - sqe.user_data = @intFromPtr(fiber); + try fiber.enterCancelRegion(thread); + + getSqe(iou).* = .{ + .opcode = .READ, + .flags = 0, + .ioprio = 0, + .fd = file.handle, + .off = @bitCast(offset), + .addr = @intFromPtr(buffer.ptr), + .len = @min(buffer.len, 0x7ffff000), + .rw_flags = 0, + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; - @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); - @atomicStore(bool, &fiber.can_cancel, false, .release); + fiber.exitCancelRegion(thread); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return @as(u32, @bitCast(completion.result)), - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return error.AsyncCancel, .INVAL => unreachable, @@ -868,30 +964,44 @@ pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadE .NOTCONN => return error.SocketNotConnected, .CONNRESET => return error.ConnectionResetByPeer, .TIMEDOUT => return error.ConnectionTimedOut, + .NXIO => return error.Unseekable, + .SPIPE => return error.Unseekable, + .OVERFLOW => return error.Unseekable, else => |err| return std.posix.unexpectedErrno(err), } } -pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize { +pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - - const thread: *Thread = .current(el); + const thread: *Thread = .current(); const iou = &thread.io_uring; const fiber = thread.currentFiber(); - if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel; - - const sqe = getSqe(iou); - sqe.prep_write(file.handle, buffer, std.math.maxInt(u64)); - sqe.user_data = @intFromPtr(fiber); + try fiber.enterCancelRegion(thread); + + getSqe(iou).* = .{ + .opcode = .WRITE, + .flags = 0, + .ioprio = 0, + .fd = file.handle, + .off = @bitCast(offset), + .addr = @intFromPtr(buffer.ptr), + .len = @min(buffer.len, 0x7ffff000), + .rw_flags = 0, + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; - @atomicStore(bool, &fiber.can_cancel, true, .release); el.yield(null, .nothing); - @atomicStore(bool, &fiber.can_cancel, false, .release); + fiber.exitCancelRegion(thread); const completion = fiber.resultPointer(Completion); switch (errno(completion.result)) { .SUCCESS => return @as(u32, @bitCast(completion.result)), - .INTR => @panic("TODO is this reachable?"), + .INTR => unreachable, .CANCELED => return error.AsyncCancel, .INVAL => return error.InvalidArgument, @@ -907,17 +1017,77 @@ pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.Fi .ACCES => return error.AccessDenied, .PERM => return error.PermissionDenied, .PIPE => return error.BrokenPipe, - .CONNRESET => return error.ConnectionResetByPeer, + .NXIO => return error.Unseekable, + .SPIPE => return error.Unseekable, + .OVERFLOW => return error.Unseekable, .BUSY => return error.DeviceBusy, - .NXIO => return error.NoDevice, + .CONNRESET => return error.ConnectionResetByPeer, .MSGSIZE => return error.MessageTooBig, else => |err| return std.posix.unexpectedErrno(err), } } -fn errno(signed: i32) std.posix.E { - const int = if (signed > -4096 and signed < 0) -signed else 0; - return @enumFromInt(int); +pub fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp { + _ = userdata; + const timespec = try std.posix.clock_gettime(clockid); + return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec); +} + +pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void { + const el: *EventLoop = @alignCast(@ptrCast(userdata)); + const thread: *Thread = .current(); + const iou = &thread.io_uring; + const fiber = thread.currentFiber(); + try fiber.enterCancelRegion(thread); + + const deadline_nanoseconds: i96 = switch (deadline) { + .nanoseconds => |nanoseconds| nanoseconds, + .timestamp => |timestamp| @intFromEnum(timestamp), + }; + const timespec: std.os.linux.kernel_timespec = .{ + .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)), + .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)), + }; + getSqe(iou).* = .{ + .opcode = .TIMEOUT, + .flags = 0, + .ioprio = 0, + .fd = 0, + .off = 0, + .addr = @intFromPtr(×pec), + .len = 1, + .rw_flags = @as(u32, switch (deadline) { + .nanoseconds => 0, + .timestamp => std.os.linux.IORING_TIMEOUT_ABS, + }) | @as(u32, switch (clockid) { + .REALTIME => std.os.linux.IORING_TIMEOUT_REALTIME, + .MONOTONIC => 0, + .BOOTTIME => std.os.linux.IORING_TIMEOUT_BOOTTIME, + else => return error.UnsupportedClock, + }), + .user_data = @intFromPtr(fiber), + .buf_index = 0, + .personality = 0, + .splice_fd_in = 0, + .addr3 = 0, + .resv = 0, + }; + + el.yield(null, .nothing); + fiber.exitCancelRegion(thread); + + const completion = fiber.resultPointer(Completion); + switch (errno(completion.result)) { + .SUCCESS, .TIME => return, + .INTR => unreachable, + .CANCELED => return error.AsyncCancel, + + else => |err| return std.posix.unexpectedErrno(err), + } +} + +fn errno(signed: i32) std.os.linux.E { + return .init(@bitCast(@as(isize, signed))); } fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe { |
