diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-10-23 06:02:46 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:51 -0700 |
| commit | 5578c760a77bd43ce13c9352f68f7e44c5440c8f (patch) | |
| tree | c7ea4e56a539dc7c07e005b43951d240c2453320 /lib/std | |
| parent | 6a64c9b7c8971486a818d8cb2ae44bb4dab4497f (diff) | |
| download | zig-5578c760a77bd43ce13c9352f68f7e44c5440c8f.tar.gz zig-5578c760a77bd43ce13c9352f68f7e44c5440c8f.zip | |
std.Io.Kqueue: implement wait queue per fd
Solves the issue when one kevent() call would clobber another if they
used the same file descriptor as an identifier.
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Io/Kqueue.zig | 73 |
1 files changed, 52 insertions, 21 deletions
diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index be0723448b..b41a0260e0 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -423,17 +423,35 @@ fn idle(k: *Kqueue, thread: *Thread) void { return; }, _ => { - const fiber: *Fiber = @ptrFromInt(event.udata); - assert(fiber.queue_next == null); - fiber.resultPointer(Completion).* = .{ + const event_head_fiber: *Fiber = @ptrFromInt(event.udata); + const event_tail_fiber = thread.wait_queues.fetchSwapRemove(.{ + .ident = event.ident, + .filter = event.filter, + }).?.value; + assert(event_tail_fiber.queue_next == null); + + // TODO reevaluate this logic + event_head_fiber.resultPointer(Completion).* = .{ .flags = event.flags, .fflags = event.fflags, .data = event.data, }; - if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| { - ready_queue.tail.queue_next = fiber; - ready_queue.tail = fiber; - } else maybe_ready_queue = .{ .head = fiber, .tail = fiber }; + + queue_ready: { + const head: *Fiber = if (maybe_ready_fiber == null) f: { + maybe_ready_fiber = event_head_fiber; + const next = event_head_fiber.queue_next orelse break :queue_ready; + event_head_fiber.queue_next = null; + break :f next; + } else event_head_fiber; + + if (maybe_ready_queue) |*ready_queue| { + ready_queue.tail.queue_next = head; + ready_queue.tail = event_tail_fiber; + } else { + maybe_ready_queue = .{ .head = head, .tail = event_tail_fiber }; + } + } }, }; if (maybe_ready_queue) |ready_queue| k.schedule(thread, ready_queue); @@ -1477,7 +1495,6 @@ fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Strea while (true) { try k.checkCancel(); - std.debug.print("calling readv\n", .{}); const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len)); switch (posix.errno(rc)) { .SUCCESS => return @intCast(rc), @@ -1486,19 +1503,33 @@ fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Strea .AGAIN => { const thread: *Thread = .current(); const fiber = thread.currentFiber(); - const changes = [_]posix.Kevent{ - .{ - .ident = @as(u32, @bitCast(fd)), - .filter = std.c.EVFILT.READ, - .flags = std.c.EV.ADD | std.c.EV.ONESHOT, - .fflags = 0, - .data = 0, - .udata = @intFromPtr(fiber), - }, - }; - assert(0 == (posix.kevent(thread.kq_fd, &changes, &.{}, null) catch |err| { - @panic(@errorName(err)); // TODO - })); + const ident: u32 = @bitCast(fd); + const filter = std.c.EVFILT.READ; + const gop = thread.wait_queues.getOrPut(k.gpa, .{ + .ident = ident, + .filter = filter, + }) catch return error.SystemResources; + if (gop.found_existing) { + const tail_fiber = gop.value_ptr.*; + assert(tail_fiber.queue_next == null); + tail_fiber.queue_next = fiber; + gop.value_ptr.* = fiber; + } else { + gop.value_ptr.* = fiber; + const changes = [_]posix.Kevent{ + .{ + .ident = ident, + .filter = filter, + .flags = std.c.EV.ADD | std.c.EV.ONESHOT, + .fflags = 0, + .data = 0, + .udata = @intFromPtr(fiber), + }, + }; + assert(0 == (posix.kevent(thread.kq_fd, &changes, &.{}, null) catch |err| { + @panic(@errorName(err)); // TODO + })); + } yield(k, null, .nothing); continue; }, |
