From 6a64c9b7c8971486a818d8cb2ae44bb4dab4497f Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 23 Oct 2025 05:24:41 -0700 Subject: std.Io.Kqueue: add missing Thread deinit logic --- lib/std/Io/Kqueue.zig | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) (limited to 'lib/std') diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index 3507c29462..be0723448b 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -36,6 +36,14 @@ const Thread = struct { kq_fd: posix.fd_t, idle_search_index: u32, steal_ready_search_index: u32, + /// For ensuring multiple fibers waiting on the same file descriptor and + /// filter use the same kevent. + wait_queues: std.AutoArrayHashMapUnmanaged(WaitQueueKey, *Fiber), + + const WaitQueueKey = struct { + ident: usize, + filter: i32, + }; const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread)); @@ -54,6 +62,13 @@ const Thread = struct { reserved: u32, active: u32, }; + + fn deinit(thread: *Thread, gpa: Allocator) void { + posix.close(thread.kq_fd); + assert(thread.wait_queues.count() == 0); + thread.wait_queues.deinit(gpa); + thread.* = undefined; + } }; const Fiber = struct { @@ -138,8 +153,14 @@ fn recycle(k: *Kqueue, fiber: *Fiber) void { k.gpa.free(fiber.allocatedSlice()); } -pub fn init(k: *Kqueue, gpa: Allocator) !void { - const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread); +pub const InitOptions = struct { + n_threads: ?usize = null, +}; + +pub fn init(k: *Kqueue, gpa: Allocator, options: InitOptions) !void { + assert(options.n_threads != 0); + const n_threads = @max(1, options.n_threads orelse std.Thread.getCpuCount() catch 1); + const threads_size = n_threads * @sizeOf(Thread); const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max); const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset); errdefer gpa.free(allocated_slice); @@ -186,6 +207,7 @@ pub fn init(k: *Kqueue, gpa: Allocator) !void { .kq_fd = try posix.kqueue(), .idle_search_index = 1, .steal_ready_search_index = 1, + .wait_queues = .empty, }; errdefer std.posix.close(main_thread.kq_fd); std.log.debug("created main idle {*}", .{&main_thread.idle_context}); @@ -199,10 +221,13 @@ pub fn deinit(k: *Kqueue) void { assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async } k.yield(null, .exit); + const main_thread = &k.threads.allocated[0]; + const gpa = k.gpa; + main_thread.deinit(gpa); const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(k.threads.allocated.ptr)); const idle_stack_end_offset = std.mem.alignForward(usize, k.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max); for (k.threads.allocated[1..active_threads]) |*thread| thread.thread.join(); - k.gpa.free(allocated_ptr[0..idle_stack_end_offset]); + gpa.free(allocated_ptr[0..idle_stack_end_offset]); k.* = undefined; } @@ -317,6 +342,7 @@ fn schedule(k: *Kqueue, thread: *Thread, ready_queue: Fiber.Queue) void { }, .idle_search_index = 0, .steal_ready_search_index = 0, + .wait_queues = .empty, }; new_thread.thread = std.Thread.spawn(.{ .stack_size = idle_stack_size, @@ -355,6 +381,7 @@ fn threadEntry(k: *Kqueue, index: u32) void { Thread.self = thread; std.log.debug("created thread idle {*}", .{&thread.idle_context}); k.idle(thread); + thread.deinit(k.gpa); } const Completion = struct { -- cgit v1.2.3