aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Io/EventLoop.zig
diff options
context:
space:
mode:
authorJacob Young <jacobly0@users.noreply.github.com>2025-03-30 15:13:41 -0400
committerAndrew Kelley <andrew@ziglang.org>2025-10-29 06:20:48 -0700
commit08b609a79ff151e747c6b860c90b634daaa68f1c (patch)
tree2ac9a3ac308bd53125ef1770e9002c8274f2e734 /lib/std/Io/EventLoop.zig
parent5041c9ad9cbf479e62416cd06ef8a178f3467127 (diff)
downloadzig-08b609a79ff151e747c6b860c90b634daaa68f1c.tar.gz
zig-08b609a79ff151e747c6b860c90b634daaa68f1c.zip
Io: implement sleep and fix cancel bugs
Diffstat (limited to 'lib/std/Io/EventLoop.zig')
-rw-r--r--lib/std/Io/EventLoop.zig412
1 files changed, 291 insertions, 121 deletions
diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig
index 5de161d9f9..a24d5173e2 100644
--- a/lib/std/Io/EventLoop.zig
+++ b/lib/std/Io/EventLoop.zig
@@ -31,10 +31,12 @@ const Thread = struct {
idle_search_index: u32,
steal_ready_search_index: u32,
- threadlocal var index: u32 = undefined;
+ const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
- fn current(el: *EventLoop) *Thread {
- return &el.threads.allocated[index];
+ threadlocal var self: *Thread = undefined;
+
+ fn current() *Thread {
+ return self;
}
fn currentFiber(thread: *Thread) *Fiber {
@@ -52,10 +54,9 @@ const Fiber = struct {
context: Context,
awaiter: ?*Fiber,
queue_next: ?*Fiber,
- can_cancel: bool,
- canceled: bool,
+ cancel_thread: ?*Thread,
- const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber)));
+ const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
const max_result_align: Alignment = .@"16";
const max_result_size = max_result_align.forward(64);
@@ -75,7 +76,7 @@ const Fiber = struct {
);
fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber {
- const thread: *Thread = .current(el);
+ const thread: *Thread = .current();
if (thread.free_queue) |free_fiber| {
thread.free_queue = free_fiber.queue_next;
free_fiber.queue_next = null;
@@ -101,6 +102,40 @@ const Fiber = struct {
return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
}
+ fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{AsyncCancel}!void {
+ if (@cmpxchgStrong(
+ ?*Thread,
+ &fiber.cancel_thread,
+ null,
+ thread,
+ .acq_rel,
+ .acquire,
+ )) |cancel_thread| {
+ assert(cancel_thread == Thread.canceling);
+ return error.AsyncCancel;
+ }
+ }
+
+ fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
+ if (@cmpxchgStrong(
+ ?*Thread,
+ &fiber.cancel_thread,
+ thread,
+ null,
+ .acq_rel,
+ .acquire,
+ )) |cancel_thread| assert(cancel_thread == Thread.canceling);
+ }
+
+ fn recycle(fiber: *Fiber) void {
+ const thread: *Thread = .current();
+ std.log.debug("recyling {*}", .{fiber});
+ assert(fiber.queue_next == null);
+ @memset(fiber.allocatedSlice(), undefined);
+ fiber.queue_next = thread.free_queue;
+ thread.free_queue = fiber;
+ }
+
const Queue = struct { head: *Fiber, tail: *Fiber };
};
@@ -110,13 +145,18 @@ pub fn io(el: *EventLoop) Io {
.vtable = &.{
.@"async" = @"async",
.@"await" = @"await",
+
.cancel = cancel,
.cancelRequested = cancelRequested,
+
.createFile = createFile,
.openFile = openFile,
.closeFile = closeFile,
- .read = read,
- .write = write,
+ .pread = pread,
+ .pwrite = pwrite,
+
+ .now = now,
+ .sleep = sleep,
},
};
}
@@ -133,8 +173,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
.context = undefined,
.awaiter = null,
.queue_next = null,
- .can_cancel = false,
- .canceled = false,
+ .cancel_thread = null,
},
.threads = .{
.allocated = @ptrCast(allocated_slice[0..threads_size]),
@@ -142,8 +181,8 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
.active = 1,
},
};
- Thread.index = 0;
const main_thread = &el.threads.allocated[0];
+ Thread.self = main_thread;
const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
(idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
main_thread.* = .{
@@ -168,24 +207,22 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
pub fn deinit(el: *EventLoop) void {
const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
for (el.threads.allocated[0..active_threads]) |*thread|
- assert(@atomicLoad(?*Fiber, &thread.ready_queue, .unordered) == null); // pending async
+ assert(@atomicLoad(?*Fiber, &thread.ready_queue, .acquire) == null); // pending async
el.yield(null, .exit);
+ for (el.threads.allocated[0..active_threads]) |*thread| while (thread.free_queue) |free_fiber| {
+ thread.free_queue = free_fiber.queue_next;
+ free_fiber.queue_next = null;
+ el.gpa.free(free_fiber.allocatedSlice());
+ };
const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr));
const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
- for (el.threads.allocated[1..active_threads]) |*thread| {
- thread.thread.join();
- while (thread.free_queue) |free_fiber| {
- thread.free_queue = free_fiber.queue_next;
- free_fiber.queue_next = null;
- el.gpa.free(free_fiber.allocatedSlice());
- }
- }
+ for (el.threads.allocated[1..active_threads]) |thread| thread.thread.join();
el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
el.* = undefined;
}
fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
- const thread: *Thread = .current(el);
+ const thread: *Thread = .current();
const ready_context: *Context = if (maybe_ready_fiber) |ready_fiber|
&ready_fiber.context
else if (thread.ready_queue) |ready_fiber| ready_context: {
@@ -198,6 +235,7 @@ fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage
defer thread.steal_ready_search_index += 1;
if (thread.steal_ready_search_index == ready_threads) thread.steal_ready_search_index = 0;
const steal_ready_search_thread = &el.threads.allocated[thread.steal_ready_search_index];
+ if (steal_ready_search_thread == thread) continue;
const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
if (@cmpxchgWeak(
?*Fiber,
@@ -236,6 +274,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
defer thread.idle_search_index += 1;
if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
const idle_search_thread = &el.threads.allocated[thread.idle_search_index];
+ if (idle_search_thread == thread) continue;
if (@cmpxchgWeak(
?*Fiber,
&idle_search_thread.ready_queue,
@@ -249,11 +288,11 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
.flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
.ioprio = 0,
.fd = idle_search_thread.io_uring.fd,
- .off = @intFromEnum(Completion.Key.wakeup),
+ .off = @intFromEnum(Completion.UserData.wakeup),
.addr = 0,
.len = 0,
.rw_flags = 0,
- .user_data = @intFromEnum(Completion.Key.wakeup),
+ .user_data = @intFromEnum(Completion.UserData.wakeup),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
@@ -314,15 +353,6 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
)) |old_head| ready_queue.tail.queue_next = old_head;
}
-fn recycle(el: *EventLoop, fiber: *Fiber) void {
- const thread: *Thread = .current(el);
- std.log.debug("recyling {*}", .{fiber});
- assert(fiber.queue_next == null);
- @memset(fiber.allocatedSlice(), undefined);
- fiber.queue_next = thread.free_queue;
- thread.free_queue = fiber;
-}
-
fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
message.handle(el);
const thread: *Thread = &el.threads.allocated[0];
@@ -332,17 +362,16 @@ fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAl
}
fn threadEntry(el: *EventLoop, index: u32) void {
- Thread.index = index;
const thread: *Thread = &el.threads.allocated[index];
+ Thread.self = thread;
std.log.debug("created thread idle {*}", .{&thread.idle_context});
el.idle(thread);
}
const Completion = struct {
- const Key = enum(usize) {
+ const UserData = enum(usize) {
unused,
wakeup,
- cancel,
cleanup,
exit,
/// *Fiber
@@ -369,26 +398,43 @@ fn idle(el: *EventLoop, thread: *Thread) void {
break :cqes_len 0;
},
else => |e| @panic(@errorName(e)),
- }]) |cqe| switch (@as(Completion.Key, @enumFromInt(cqe.user_data))) {
+ }]) |cqe| switch (@as(Completion.UserData, @enumFromInt(cqe.user_data))) {
.unused => unreachable, // bad submission queued?
.wakeup => {},
- .cancel => {},
.cleanup => @panic("failed to notify other threads that we are exiting"),
.exit => {
assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
return;
},
- _ => {
- const fiber: *Fiber = @ptrFromInt(cqe.user_data);
- assert(fiber.queue_next == null);
- fiber.resultPointer(Completion).* = .{
- .result = cqe.res,
- .flags = cqe.flags,
- };
- if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
- ready_queue.tail.queue_next = fiber;
- ready_queue.tail = fiber;
- } else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
+ _ => switch (errno(cqe.res)) {
+ .INTR => getSqe(&thread.io_uring).* = .{
+ .opcode = .ASYNC_CANCEL,
+ .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
+ .ioprio = 0,
+ .fd = 0,
+ .off = 0,
+ .addr = cqe.user_data,
+ .len = 0,
+ .rw_flags = 0,
+ .user_data = @intFromEnum(Completion.UserData.wakeup),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ },
+ else => {
+ const fiber: *Fiber = @ptrFromInt(cqe.user_data);
+ assert(fiber.queue_next == null);
+ fiber.resultPointer(Completion).* = .{
+ .result = cqe.res,
+ .flags = cqe.flags,
+ };
+ if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
+ ready_queue.tail.queue_next = fiber;
+ ready_queue.tail = fiber;
+ } else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
+ },
},
};
if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue);
@@ -409,7 +455,7 @@ const SwitchMessage = struct {
};
fn handle(message: *const SwitchMessage, el: *EventLoop) void {
- const thread: *Thread = .current(el);
+ const thread: *Thread = .current();
thread.current_context = message.contexts.ready;
switch (message.pending_task) {
.nothing => {},
@@ -429,11 +475,11 @@ const SwitchMessage = struct {
.flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
.ioprio = 0,
.fd = each_thread.io_uring.fd,
- .off = @intFromEnum(Completion.Key.exit),
+ .off = @intFromEnum(Completion.UserData.exit),
.addr = 0,
.len = 0,
.rw_flags = 0,
- .user_data = @intFromEnum(Completion.Key.cleanup),
+ .user_data = @intFromEnum(Completion.UserData.cleanup),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
@@ -544,6 +590,7 @@ fn @"async"(
start(context.ptr, result.ptr);
return null;
};
+ errdefer fiber.recycle();
std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
@@ -560,8 +607,7 @@ fn @"async"(
},
.awaiter = null,
.queue_next = null,
- .can_cancel = false,
- .canceled = false,
+ .cancel_thread = null,
};
closure.* = .{
.event_loop = event_loop,
@@ -571,7 +617,7 @@ fn @"async"(
};
@memcpy(closure.contextPointer(), context);
- event_loop.schedule(.current(event_loop), .{ .head = fiber, .tail = fiber });
+ event_loop.schedule(.current(), .{ .head = fiber, .tail = fiber });
return @ptrCast(fiber);
}
@@ -585,7 +631,7 @@ fn @"await"(
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
@memcpy(result, future_fiber.resultBytes(result_alignment));
- event_loop.recycle(future_fiber);
+ future_fiber.recycle();
}
fn cancel(
@@ -594,35 +640,37 @@ fn cancel(
result: []u8,
result_alignment: Alignment,
) void {
- const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
- @atomicStore(bool, &future_fiber.canceled, true, .release);
- if (@atomicLoad(bool, &future_fiber.can_cancel, .acquire)) {
- const thread: *Thread = .current(event_loop);
- getSqe(&thread.io_uring).* = .{
- .opcode = .ASYNC_CANCEL,
+ if (@atomicRmw(
+ ?*Thread,
+ &future_fiber.cancel_thread,
+ .Xchg,
+ Thread.canceling,
+ .acq_rel,
+ )) |cancel_thread| if (cancel_thread != Thread.canceling) {
+ getSqe(&Thread.current().io_uring).* = .{
+ .opcode = .MSG_RING,
.flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
.ioprio = 0,
- .fd = 0,
- .off = 0,
- .addr = @intFromPtr(future_fiber),
- .len = 0,
+ .fd = cancel_thread.io_uring.fd,
+ .off = @intFromPtr(future_fiber),
+ .addr = 0,
+ .len = @bitCast(-@as(i32, @intFromEnum(std.os.linux.E.INTR))),
.rw_flags = 0,
- .user_data = @intFromEnum(Completion.Key.cancel),
+ .user_data = @intFromEnum(Completion.UserData.cleanup),
.buf_index = 0,
.personality = 0,
.splice_fd_in = 0,
.addr3 = 0,
.resv = 0,
};
- }
+ };
@"await"(userdata, any_future, result, result_alignment);
}
fn cancelRequested(userdata: ?*anyopaque) bool {
- const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
- const thread: *Thread = .current(event_loop);
- return thread.currentFiber().canceled;
+ _ = userdata;
+ return @atomicLoad(?*Thread, &Thread.current().currentFiber().cancel_thread, .acquire) == Thread.canceling;
}
pub fn createFile(
@@ -632,6 +680,10 @@ pub fn createFile(
flags: Io.CreateFlags,
) Io.FileOpenError!std.fs.File {
const el: *EventLoop = @alignCast(@ptrCast(userdata));
+ const thread: *Thread = .current();
+ const iou = &thread.io_uring;
+ const fiber = thread.currentFiber();
+ try fiber.enterCancelRegion(thread);
const posix = std.posix;
const sub_path_c = try posix.toPosixPath(sub_path);
@@ -670,23 +722,30 @@ pub fn createFile(
@panic("TODO");
}
- const thread: *Thread = .current(el);
- const iou = &thread.io_uring;
- const fiber = thread.currentFiber();
- if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
-
- const sqe = getSqe(iou);
- sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode);
- sqe.user_data = @intFromPtr(fiber);
+ getSqe(iou).* = .{
+ .opcode = .OPENAT,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = dir.fd,
+ .off = 0,
+ .addr = @intFromPtr(&sub_path_c),
+ .len = @intCast(flags.mode),
+ .rw_flags = @bitCast(os_flags),
+ .user_data = @intFromPtr(fiber),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
- @atomicStore(bool, &fiber.can_cancel, true, .release);
el.yield(null, .nothing);
- @atomicStore(bool, &fiber.can_cancel, false, .release);
+ fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return .{ .handle = completion.result },
- .INTR => @panic("TODO is this reachable?"),
+ .INTR => unreachable,
.CANCELED => return error.AsyncCancel,
.FAULT => unreachable,
@@ -723,10 +782,10 @@ pub fn openFile(
flags: Io.OpenFlags,
) Io.FileOpenError!std.fs.File {
const el: *EventLoop = @alignCast(@ptrCast(userdata));
- const thread: *Thread = .current(el);
+ const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
- if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
+ try fiber.enterCancelRegion(thread);
const posix = std.posix;
const sub_path_c = try posix.toPosixPath(sub_path);
@@ -771,18 +830,30 @@ pub fn openFile(
@panic("TODO");
}
- const sqe = getSqe(iou);
- sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0);
- sqe.user_data = @intFromPtr(fiber);
+ getSqe(iou).* = .{
+ .opcode = .OPENAT,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = dir.fd,
+ .off = 0,
+ .addr = @intFromPtr(&sub_path_c),
+ .len = 0,
+ .rw_flags = @bitCast(os_flags),
+ .user_data = @intFromPtr(fiber),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
- @atomicStore(bool, &fiber.can_cancel, true, .release);
el.yield(null, .nothing);
- @atomicStore(bool, &fiber.can_cancel, false, .release);
+ fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return .{ .handle = completion.result },
- .INTR => @panic("TODO is this reachable?"),
+ .INTR => unreachable,
.CANCELED => return error.AsyncCancel,
.FAULT => unreachable,
@@ -814,20 +885,33 @@ pub fn openFile(
pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
const el: *EventLoop = @alignCast(@ptrCast(userdata));
- const thread: *Thread = .current(el);
+ const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
- const sqe = getSqe(iou);
- sqe.prep_close(file.handle);
- sqe.user_data = @intFromPtr(fiber);
+ getSqe(iou).* = .{
+ .opcode = .CLOSE,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = file.handle,
+ .off = 0,
+ .addr = 0,
+ .len = 0,
+ .rw_flags = 0,
+ .user_data = @intFromPtr(fiber),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
el.yield(null, .nothing);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return,
- .INTR => @panic("TODO is this reachable?"),
+ .INTR => unreachable,
.CANCELED => return,
.BADF => unreachable, // Always a race condition.
@@ -835,25 +919,37 @@ pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
}
}
-pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize {
+pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize {
const el: *EventLoop = @alignCast(@ptrCast(userdata));
- const thread: *Thread = .current(el);
+ const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
- if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
-
- const sqe = getSqe(iou);
- sqe.prep_read(file.handle, buffer, std.math.maxInt(u64));
- sqe.user_data = @intFromPtr(fiber);
+ try fiber.enterCancelRegion(thread);
+
+ getSqe(iou).* = .{
+ .opcode = .READ,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = file.handle,
+ .off = @bitCast(offset),
+ .addr = @intFromPtr(buffer.ptr),
+ .len = @min(buffer.len, 0x7ffff000),
+ .rw_flags = 0,
+ .user_data = @intFromPtr(fiber),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
- @atomicStore(bool, &fiber.can_cancel, true, .release);
el.yield(null, .nothing);
- @atomicStore(bool, &fiber.can_cancel, false, .release);
+ fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return @as(u32, @bitCast(completion.result)),
- .INTR => @panic("TODO is this reachable?"),
+ .INTR => unreachable,
.CANCELED => return error.AsyncCancel,
.INVAL => unreachable,
@@ -868,30 +964,44 @@ pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadE
.NOTCONN => return error.SocketNotConnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
+ .NXIO => return error.Unseekable,
+ .SPIPE => return error.Unseekable,
+ .OVERFLOW => return error.Unseekable,
else => |err| return std.posix.unexpectedErrno(err),
}
}
-pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize {
+pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize {
const el: *EventLoop = @alignCast(@ptrCast(userdata));
-
- const thread: *Thread = .current(el);
+ const thread: *Thread = .current();
const iou = &thread.io_uring;
const fiber = thread.currentFiber();
- if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
-
- const sqe = getSqe(iou);
- sqe.prep_write(file.handle, buffer, std.math.maxInt(u64));
- sqe.user_data = @intFromPtr(fiber);
+ try fiber.enterCancelRegion(thread);
+
+ getSqe(iou).* = .{
+ .opcode = .WRITE,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = file.handle,
+ .off = @bitCast(offset),
+ .addr = @intFromPtr(buffer.ptr),
+ .len = @min(buffer.len, 0x7ffff000),
+ .rw_flags = 0,
+ .user_data = @intFromPtr(fiber),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
- @atomicStore(bool, &fiber.can_cancel, true, .release);
el.yield(null, .nothing);
- @atomicStore(bool, &fiber.can_cancel, false, .release);
+ fiber.exitCancelRegion(thread);
const completion = fiber.resultPointer(Completion);
switch (errno(completion.result)) {
.SUCCESS => return @as(u32, @bitCast(completion.result)),
- .INTR => @panic("TODO is this reachable?"),
+ .INTR => unreachable,
.CANCELED => return error.AsyncCancel,
.INVAL => return error.InvalidArgument,
@@ -907,17 +1017,77 @@ pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.Fi
.ACCES => return error.AccessDenied,
.PERM => return error.PermissionDenied,
.PIPE => return error.BrokenPipe,
- .CONNRESET => return error.ConnectionResetByPeer,
+ .NXIO => return error.Unseekable,
+ .SPIPE => return error.Unseekable,
+ .OVERFLOW => return error.Unseekable,
.BUSY => return error.DeviceBusy,
- .NXIO => return error.NoDevice,
+ .CONNRESET => return error.ConnectionResetByPeer,
.MSGSIZE => return error.MessageTooBig,
else => |err| return std.posix.unexpectedErrno(err),
}
}
-fn errno(signed: i32) std.posix.E {
- const int = if (signed > -4096 and signed < 0) -signed else 0;
- return @enumFromInt(int);
+pub fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
+ _ = userdata;
+ const timespec = try std.posix.clock_gettime(clockid);
+ return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
+}
+
+pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
+ const el: *EventLoop = @alignCast(@ptrCast(userdata));
+ const thread: *Thread = .current();
+ const iou = &thread.io_uring;
+ const fiber = thread.currentFiber();
+ try fiber.enterCancelRegion(thread);
+
+ const deadline_nanoseconds: i96 = switch (deadline) {
+ .nanoseconds => |nanoseconds| nanoseconds,
+ .timestamp => |timestamp| @intFromEnum(timestamp),
+ };
+ const timespec: std.os.linux.kernel_timespec = .{
+ .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
+ .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)),
+ };
+ getSqe(iou).* = .{
+ .opcode = .TIMEOUT,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = 0,
+ .off = 0,
+ .addr = @intFromPtr(&timespec),
+ .len = 1,
+ .rw_flags = @as(u32, switch (deadline) {
+ .nanoseconds => 0,
+ .timestamp => std.os.linux.IORING_TIMEOUT_ABS,
+ }) | @as(u32, switch (clockid) {
+ .REALTIME => std.os.linux.IORING_TIMEOUT_REALTIME,
+ .MONOTONIC => 0,
+ .BOOTTIME => std.os.linux.IORING_TIMEOUT_BOOTTIME,
+ else => return error.UnsupportedClock,
+ }),
+ .user_data = @intFromPtr(fiber),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+
+ el.yield(null, .nothing);
+ fiber.exitCancelRegion(thread);
+
+ const completion = fiber.resultPointer(Completion);
+ switch (errno(completion.result)) {
+ .SUCCESS, .TIME => return,
+ .INTR => unreachable,
+ .CANCELED => return error.AsyncCancel,
+
+ else => |err| return std.posix.unexpectedErrno(err),
+ }
+}
+
+fn errno(signed: i32) std.os.linux.E {
+ return .init(@bitCast(@as(isize, signed)));
}
fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {