diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-03-28 21:07:12 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:47 -0700 |
| commit | 238de05d2ca629237e434ca0d577dbd0b8ed0c9a (patch) | |
| tree | e571fa2cb341f94ed5ab84fa8b34bf6186ae8a35 /lib/std/Io/EventLoop.zig | |
| parent | 66b0f7e92b64538de7f9d9f37a4ef1c25f0f6e5b (diff) | |
| download | zig-238de05d2ca629237e434ca0d577dbd0b8ed0c9a.tar.gz zig-238de05d2ca629237e434ca0d577dbd0b8ed0c9a.zip | |
WIP
Diffstat (limited to 'lib/std/Io/EventLoop.zig')
| -rw-r--r-- | lib/std/Io/EventLoop.zig | 241 |
1 files changed, 213 insertions, 28 deletions
diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 84ea623954..ca42627005 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -5,6 +5,7 @@ const Allocator = std.mem.Allocator; const Io = std.Io; const EventLoop = @This(); const Alignment = std.mem.Alignment; +const IoUring = std.os.linux.IoUring; gpa: Allocator, mutex: std.Thread.Mutex, @@ -13,18 +14,27 @@ queue: std.DoublyLinkedList(void), free: std.DoublyLinkedList(void), main_context: Context, exit_awaiter: ?*Fiber, -idle_count: usize, threads: std.ArrayListUnmanaged(Thread), +/// 1 bit per thread, same order as `thread_index`. +idle_iourings: []usize, -threadlocal var current_idle_context: *Context = undefined; -threadlocal var current_context: *Context = undefined; +threadlocal var thread_index: u32 = undefined; /// Empirically saw 10KB being used by the self-hosted backend for logging. const idle_stack_size = 32 * 1024; +const io_uring_entries = 64; + const Thread = struct { thread: std.Thread, idle_context: Context, + current_idle_context: *Context, + current_context: *Context, + io_uring: IoUring, + + fn currentFiber(thread: *Thread) *Fiber { + return @fieldParentPtr("context", thread.current_context); + } }; const Fiber = struct { @@ -83,16 +93,25 @@ pub fn io(el: *EventLoop) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", + .createFile = createFile, + .openFile = openFile, + .closeFile = closeFile, + .read = read, + .write = write, }, }; } -pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { - const threads_bytes = ((std.Thread.getCpuCount() catch 1) -| 1) * @sizeOf(Thread); +pub fn init(el: *EventLoop, gpa: Allocator) !void { + const n_threads: usize = @max((std.Thread.getCpuCount() catch 1), 1); + const threads_bytes = n_threads * @sizeOf(Thread); const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context)); const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context)), idle_stack_end_offset); errdefer gpa.free(allocated_slice); + const idle_iourings = try gpa.alloc(usize, (n_threads + @bitSizeOf(usize) - 1) / @bitSizeOf(usize)); + errdefer gpa.free(idle_iourings); + @memset(idle_iourings, 0); el.* = .{ .gpa = gpa, .mutex = .{}, @@ -101,9 +120,11 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { .free = .{}, .main_context = undefined, .exit_awaiter = null, - .idle_count = 0, .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])), + .idle_iourings = idle_iourings, }; + const main_thread = el.threads.addOneAssumeCapacity(); + main_thread.io_uring = try IoUring.init(io_uring_entries, 0); const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)])); const idle_stack_end: [*]align(@max(@alignOf(Thread), @alignOf(Context))) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr)); (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)}; @@ -113,9 +134,9 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void { .rip = @intFromPtr(&mainIdleEntry), }; std.log.debug("created main idle {*}", .{main_idle_context}); - current_idle_context = main_idle_context; + main_thread.current_idle_context = main_idle_context; std.log.debug("created main {*}", .{&el.main_context}); - current_context = &el.main_context; + main_thread.current_context = &el.main_context; } pub fn deinit(el: *EventLoop) void { @@ -125,14 +146,21 @@ pub fn deinit(el: *EventLoop) void { const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node)); el.gpa.free(free_fiber.allocatedSlice()); } - const idle_context_offset = std.mem.alignForward(usize, el.threads.items.len * @sizeOf(Thread), @alignOf(Context)); + const idle_context_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread), @alignOf(Context)); const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max); const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context))) u8 = @alignCast(@ptrCast(el.threads.items.ptr)); - for (el.threads.items) |*thread| thread.thread.join(); + for (el.threads.items[1..]) |*thread| thread.thread.join(); el.gpa.free(allocated_ptr[0..idle_stack_end]); } -fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void { +const PendingTask = union(enum) { + none, + register_awaiter: *?*Fiber, + io_uring_submit: *IoUring, +}; + +fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: PendingTask) void { + const thread: *Thread = &el.threads.items[thread_index]; const ready_context: *Context = ready_context: { const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: { el.mutex.lock(); @@ -141,13 +169,13 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v }) |ready_node| @alignCast(@fieldParentPtr("queue_node", ready_node)) else - break :ready_context current_idle_context; + break :ready_context thread.current_idle_context; break :ready_context &ready_fiber.context; }; const message: SwitchMessage = .{ - .prev_context = current_context, + .prev_context = thread.current_context, .ready_context = ready_context, - .register_awaiter = register_awaiter, + .pending_task = pending_task, }; std.log.debug("switching from {*} to {*}", .{ message.prev_context, message.ready_context }); contextSwitch(&message).handle(el); @@ -156,6 +184,11 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v fn schedule(el: *EventLoop, fiber: *Fiber) void { el.mutex.lock(); el.queue.append(&fiber.queue_node); + //for (el.idle_iourings) |*int| { + // const idler_subset = @atomicLoad(usize, int, .unordered); + // if (idler_subset == 0) continue; + // + //} if (el.idle_count > 0) { el.mutex.unlock(); el.cond.signal(); @@ -167,7 +200,7 @@ fn schedule(el: *EventLoop, fiber: *Fiber) void { thread.thread = std.Thread.spawn(.{ .stack_size = idle_stack_size, .allocator = el.gpa, - }, threadEntry, .{ el, thread }) catch { + }, threadEntry, .{ el, el.threads.items.len - 1 }) catch { el.threads.items.len -= 1; return; }; @@ -187,38 +220,61 @@ fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAl unreachable; // switched to dead fiber } -fn threadEntry(el: *EventLoop, thread: *Thread) void { +fn threadEntry(el: *EventLoop, index: usize) void { + thread_index = index; + const thread: *Thread = &el.threads.items[index]; std.log.debug("created thread idle {*}", .{&thread.idle_context}); - current_idle_context = &thread.idle_context; - current_context = &thread.idle_context; + thread.io_uring = IoUring.init(io_uring_entries, 0) catch |err| { + std.log.warn("exiting worker thread during init due to io_uring init failure: {s}", .{@errorName(err)}); + return; + }; + thread.current_idle_context = &thread.idle_context; + thread.current_context = &thread.idle_context; _ = el.idle(); } fn idle(el: *EventLoop) *Fiber { + const thread: *Thread = &el.threads.items[thread_index]; + // The idle fiber only runs on one thread. + const iou = &thread.io_uring; + var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined; + while (true) { el.yield(null, null); if (@atomicLoad(?*Fiber, &el.exit_awaiter, .acquire)) |exit_awaiter| { el.cond.broadcast(); return exit_awaiter; } - el.mutex.lock(); - defer el.mutex.unlock(); - el.idle_count += 1; - defer el.idle_count -= 1; - el.cond.wait(&el.mutex); + // TODO add uring to bit set + const n = iou.copy_cqes(&cqes_buffer, 1) catch @panic("TODO handle copy_cqes error"); + const cqes = cqes_buffer[0..n]; + for (cqes) |cqe| { + const fiber: *Fiber = @ptrFromInt(cqe.user_data); + const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer())); + res.* = cqe.res; + el.schedule(fiber); + } } } const SwitchMessage = extern struct { prev_context: *Context, ready_context: *Context, - register_awaiter: ?*?*Fiber, + pending_task: PendingTask, fn handle(message: *const SwitchMessage, el: *EventLoop) void { - current_context = message.ready_context; - if (message.register_awaiter) |awaiter| { - const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context)); - if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); + const thread: *Thread = &el.threads.items[thread_index]; + thread.current_context = message.ready_context; + switch (message.pending_task) { + .none => {}, + .register_awaiter => |awaiter| { + const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context)); + if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); + }, + .io_uring_submit => |iou| { + _ = iou.flush_sq(); + // TODO: determine whether this return value should be used + }, } } }; @@ -357,3 +413,132 @@ pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: [] @memcpy(result, future_fiber.resultPointer()); event_loop.recycle(future_fiber); } + +pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.CreateFlags) std.fs.File.OpenError!std.fs.File { + _ = userdata; + _ = dir; + _ = sub_path; + _ = flags; + @panic("TODO"); +} + +pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.OpenFlags) std.fs.File.OpenError!std.fs.File { + const el: *EventLoop = @ptrCast(@alignCast(userdata)); + + const posix = std.posix; + const sub_path_c = try posix.toPosixPath(sub_path); + + var os_flags: posix.O = .{ + .ACCMODE = switch (flags.mode) { + .read_only => .RDONLY, + .write_only => .WRONLY, + .read_write => .RDWR, + }, + }; + + if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true; + if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true; + if (@hasField(posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty; + + // Use the O locking flags if the os supports them to acquire the lock + // atomically. + const has_flock_open_flags = @hasField(posix.O, "EXLOCK"); + if (has_flock_open_flags) { + // Note that the NONBLOCK flag is removed after the openat() call + // is successful. + switch (flags.lock) { + .none => {}, + .shared => { + os_flags.SHLOCK = true; + os_flags.NONBLOCK = flags.lock_nonblocking; + }, + .exclusive => { + os_flags.EXLOCK = true; + os_flags.NONBLOCK = flags.lock_nonblocking; + }, + } + } + const have_flock = @TypeOf(posix.system.flock) != void; + + if (have_flock and !has_flock_open_flags and flags.lock != .none) { + @panic("TODO"); + } + + if (has_flock_open_flags and flags.lock_nonblocking) { + @panic("TODO"); + } + + const thread: *Thread = &el.threads.items[thread_index]; + const iou = &thread.io_uring; + const sqe = getSqe(iou); + const fiber = thread.currentFiber(); + + sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0); + sqe.user_data = @intFromPtr(fiber); + + el.yield(null, .{ .io_uring_submit = iou }); + + const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)])); + const rc = result.*; + switch (errno(rc)) { + .SUCCESS => return .{ .handle = rc }, + .INTR => @panic("TODO is this reachable?"), + .CANCELED => @panic("TODO figure out how this error code fits into things"), + + .FAULT => unreachable, + .INVAL => return error.BadPathName, + .BADF => unreachable, + .ACCES => return error.AccessDenied, + .FBIG => return error.FileTooBig, + .OVERFLOW => return error.FileTooBig, + .ISDIR => return error.IsDir, + .LOOP => return error.SymLinkLoop, + .MFILE => return error.ProcessFdQuotaExceeded, + .NAMETOOLONG => return error.NameTooLong, + .NFILE => return error.SystemFdQuotaExceeded, + .NODEV => return error.NoDevice, + .NOENT => return error.FileNotFound, + .NOMEM => return error.SystemResources, + .NOSPC => return error.NoSpaceLeft, + .NOTDIR => return error.NotDir, + .PERM => return error.PermissionDenied, + .EXIST => return error.PathAlreadyExists, + .BUSY => return error.DeviceBusy, + .OPNOTSUPP => return error.FileLocksNotSupported, + .AGAIN => return error.WouldBlock, + .TXTBSY => return error.FileBusy, + .NXIO => return error.NoDevice, + else => |err| return posix.unexpectedErrno(err), + } + + return .{ .handle = result.* }; +} + +fn errno(signed: i32) std.posix.E { + const int = if (signed > -4096 and signed < 0) -signed else 0; + return @enumFromInt(int); +} + +fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe { + return iou.get_sqe() catch @panic("TODO: handle submission queue full"); +} + +pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { + _ = userdata; + _ = file; + @panic("TODO"); +} + +pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File.ReadError!usize { + _ = userdata; + _ = file; + _ = buffer; + @panic("TODO"); +} + +pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.fs.File.WriteError!usize { + _ = userdata; + _ = file; + _ = buffer; + @panic("TODO"); +} |
