diff options
| author | Jacob Young <jacobly0@users.noreply.github.com> | 2025-03-27 01:49:01 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:47 -0700 |
| commit | fe6f1efde4b1b192a7e8061ee35c4af4838debae (patch) | |
| tree | fad279d774f8c62151b47a1f9af2b2d486779901 /lib/std/Io/EventLoop.zig | |
| parent | 4d56267938b83b04cf6f4dd6e0f4328e0cec8375 (diff) | |
| download | zig-fe6f1efde4b1b192a7e8061ee35c4af4838debae.tar.gz zig-fe6f1efde4b1b192a7e8061ee35c4af4838debae.zip | |
EventLoop: prepare for threading
Diffstat (limited to 'lib/std/Io/EventLoop.zig')
| -rw-r--r-- | lib/std/Io/EventLoop.zig | 113 |
1 files changed, 65 insertions, 48 deletions
diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index c5a9dd63ff..2976f3386a 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -5,6 +5,7 @@ const Io = std.Io; const EventLoop = @This(); gpa: Allocator, +mutex: std.Thread.Mutex, queue: std.DoublyLinkedList(void), free: std.DoublyLinkedList(void), main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)), @@ -39,6 +40,7 @@ const Fiber = struct { pub fn init(el: *EventLoop, gpa: Allocator) void { el.* = .{ .gpa = gpa, + .mutex = .{}, .queue = .{}, .free = .{}, .main_fiber_buffer = undefined, @@ -48,7 +50,11 @@ pub fn init(el: *EventLoop, gpa: Allocator) void { fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber { assert(result_len <= max_result_len); - const free_node = el.free.pop() orelse { + const free_node = free_node: { + el.mutex.lock(); + defer el.mutex.unlock(); + break :free_node el.free.pop(); + } orelse { const n = std.mem.alignForward( usize, @sizeOf(Fiber) + max_result_len + min_stack_size, @@ -59,36 +65,48 @@ fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber { return @fieldParentPtr("queue_node", free_node); } -fn yield(el: *EventLoop, optional_fiber: ?*Fiber) void { - if (optional_fiber) |fiber| { - const old = ¤t_fiber.regs; - current_fiber = fiber; - contextSwitch(old, &fiber.regs); - return; - } - if (el.queue.pop()) |node| { - const fiber: *Fiber = @fieldParentPtr("queue_node", node); - const old = ¤t_fiber.regs; - current_fiber = fiber; - contextSwitch(old, &fiber.regs); - return; - } - @panic("everything is done"); +fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void { + const message: SwitchMessage = .{ + .ready_fiber = optional_fiber orelse if (ready_node: { + el.mutex.lock(); + defer el.mutex.unlock(); + break :ready_node el.queue.pop(); + }) |ready_node| + @fieldParentPtr("queue_node", ready_node) + else if (register_awaiter) |_| + @panic("no other fiber to switch to in order to be able to register this fiber as an awaiter") // time to switch to an idle fiber? + else + return, // nothing to do + .register_awaiter = register_awaiter, + }; + std.log.debug("switching from {*} to {*}", .{ current_fiber, message.ready_fiber }); + SwitchMessage.handle(@ptrFromInt(contextSwitch(¤t_fiber.regs, &message.ready_fiber.regs, @intFromPtr(&message))), el); } -/// Equivalent to calling `yield` and then giving the fiber back to the event loop. -fn exit(el: *EventLoop, optional_fiber: ?*Fiber) noreturn { - yield(el, optional_fiber); - @panic("TODO recycle the fiber"); -} +const SwitchMessage = struct { + ready_fiber: *Fiber, + register_awaiter: ?*?*Fiber, + + fn handle(message: *const SwitchMessage, el: *EventLoop) void { + const prev_fiber = current_fiber; + current_fiber = message.ready_fiber; + if (message.register_awaiter) |awaiter| if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber); + } +}; fn schedule(el: *EventLoop, fiber: *Fiber) void { + el.mutex.lock(); + defer el.mutex.unlock(); el.queue.append(&fiber.queue_node); } -fn myFiber(el: *EventLoop) *Fiber { - _ = el; - return current_fiber; +fn recycle(el: *EventLoop, fiber: *Fiber) void { + std.log.debug("recyling {*}", .{fiber}); + fiber.awaiter = undefined; + @memset(fiber.resultPointer()[0..max_result_len], undefined); + el.mutex.lock(); + defer el.mutex.unlock(); + el.free.append(&fiber.queue_node); } const Regs = extern struct { @@ -101,7 +119,7 @@ const Regs = extern struct { rbp: usize, }; -const contextSwitch: *const fn (old: *Regs, new: *Regs) callconv(.c) void = @ptrCast(&contextSwitch_naked); +const contextSwitch: *const fn (old: *Regs, new: *Regs, message: usize) callconv(.c) usize = @ptrCast(&contextSwitch_naked); noinline fn contextSwitch_naked() callconv(.naked) void { asm volatile ( @@ -121,6 +139,7 @@ noinline fn contextSwitch_naked() callconv(.naked) void { \\movq 0x28(%%rsi), %%rbx \\movq 0x30(%%rsi), %%rbp \\ + \\movq %%rdx, %%rax \\ret ); } @@ -128,6 +147,7 @@ noinline fn contextSwitch_naked() callconv(.naked) void { fn popRet() callconv(.naked) void { asm volatile ( \\pop %%rdi + \\movq %%rax, %%rsi \\ret ); } @@ -145,6 +165,7 @@ pub fn @"async"( }; fiber.awaiter = null; fiber.queue_node = .{ .data = {} }; + std.log.debug("allocated {*}", .{fiber}); const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward( usize, @@ -157,14 +178,16 @@ pub fn @"async"( .fiber = fiber, .start = start, }; - const stack_end_ptr: [*]align(16) usize = @alignCast(@ptrCast(closure)); - (stack_end_ptr - 1)[0] = 0; - (stack_end_ptr - 2)[0] = @intFromPtr(&AsyncClosure.call); - (stack_end_ptr - 3)[0] = @intFromPtr(closure); - (stack_end_ptr - 4)[0] = @intFromPtr(&popRet); - + const stack_end: [*]align(16) usize = @alignCast(@ptrCast(closure)); + const stack_top = (stack_end - 4)[0..4]; + stack_top.* = .{ + @intFromPtr(&popRet), + @intFromPtr(closure), + @intFromPtr(&AsyncClosure.call), + 0, + }; fiber.regs = .{ - .rsp = @intFromPtr(stack_end_ptr - 4), + .rsp = @intFromPtr(stack_top), .r15 = 0, .r14 = 0, .r13 = 0, @@ -181,30 +204,24 @@ const AsyncClosure = struct { _: void align(16) = {}, event_loop: *EventLoop, context: ?*anyopaque, - fiber: *EventLoop.Fiber, + fiber: *Fiber, start: *const fn (context: ?*anyopaque, result: *anyopaque) void, - fn call(closure: *AsyncClosure) callconv(.c) void { - std.log.debug("wrap called in async", .{}); + fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn { + message.handle(closure.event_loop); + std.log.debug("{*} performing async", .{closure.fiber}); closure.start(closure.context, closure.fiber.resultPointer()); - const awaiter = @atomicRmw(?*EventLoop.Fiber, &closure.fiber.awaiter, .Xchg, EventLoop.Fiber.finished, .seq_cst); - closure.event_loop.exit(awaiter); + const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel); + closure.event_loop.yield(awaiter, null); + unreachable; // switched to dead fiber } }; pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void { const event_loop: *EventLoop = @alignCast(@ptrCast(userdata)); - const future_fiber: *EventLoop.Fiber = @alignCast(@ptrCast(any_future)); + const future_fiber: *Fiber = @alignCast(@ptrCast(any_future)); const result_src = future_fiber.resultPointer()[0..result.len]; - const my_fiber = event_loop.myFiber(); - - const prev = @atomicRmw(?*EventLoop.Fiber, &future_fiber.awaiter, .Xchg, my_fiber, .seq_cst); - if (prev == EventLoop.Fiber.finished) { - @memcpy(result, result_src); - return; - } - event_loop.yield(prev); - // Resumed when the value is available. - std.log.debug("yield returned in await", .{}); + if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter); @memcpy(result, result_src); + event_loop.recycle(future_fiber); } |
