diff options
| author | Jacob Young <jacobly0@users.noreply.github.com> | 2025-04-02 18:03:53 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:48 -0700 |
| commit | c4fcf85c43be10b703fa701ba377dd215dd91595 (patch) | |
| tree | 0cd447827dfcc4f958947dc0160765460cadec54 /lib/std/Io/EventLoop.zig | |
| parent | 3eb7be5cf6cb7050991f84c11f786c61bc5599b4 (diff) | |
| download | zig-c4fcf85c43be10b703fa701ba377dd215dd91595.tar.gz zig-c4fcf85c43be10b703fa701ba377dd215dd91595.zip | |
Io.Condition: implement full API
Diffstat (limited to 'lib/std/Io/EventLoop.zig')
| -rw-r--r-- | lib/std/Io/EventLoop.zig | 70 |
1 files changed, 55 insertions, 15 deletions
diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig index 83747bd008..edd00baac6 100644 --- a/lib/std/Io/EventLoop.zig +++ b/lib/std/Io/EventLoop.zig @@ -555,8 +555,24 @@ const SwitchMessage = struct { .condition_wait => |condition_wait| { const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev)); assert(prev_fiber.queue_next == null); - const cond_state: *?*Fiber = @ptrCast(&condition_wait.cond.state); - assert(@atomicRmw(?*Fiber, cond_state, .Xchg, prev_fiber, .release) == null); // More than one wait on same Condition is illegal. + const cond_impl = prev_fiber.resultPointer(ConditionImpl); + cond_impl.* = .{ + .tail = prev_fiber, + .event = .queued, + }; + if (@cmpxchgStrong( + ?*Fiber, + @as(*?*Fiber, @ptrCast(&condition_wait.cond.state)), + null, + prev_fiber, + .release, + .acquire, + )) |waiting_fiber| { + const waiting_cond_impl = waiting_fiber.?.resultPointer(ConditionImpl); + assert(waiting_cond_impl.tail.queue_next == null); + waiting_cond_impl.tail.queue_next = prev_fiber; + waiting_cond_impl.tail = prev_fiber; + } condition_wait.mutex.unlock(el.io()); }, .exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| { @@ -1267,10 +1283,7 @@ fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadl fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - el.yield(null, .{ .mutex_lock = .{ - .prev_state = prev_state, - .mutex = mutex, - } }); + el.yield(null, .{ .mutex_lock = .{ .prev_state = prev_state, .mutex = mutex } }); } fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void { var maybe_waiting_fiber: ?*Fiber = @ptrFromInt(@intFromEnum(prev_state)); @@ -1294,21 +1307,48 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut el.yield(maybe_waiting_fiber.?, .reschedule); } +const ConditionImpl = struct { + tail: *Fiber, + event: union(enum) { + queued, + wake: Io.Condition.Wake, + }, +}; + fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - el.yield(null, .{ .condition_wait = .{ - .cond = cond, - .mutex = mutex, - } }); + el.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } }); + const thread = Thread.current(); + const fiber = thread.currentFiber(); + const cond_impl = fiber.resultPointer(ConditionImpl); try mutex.lock(el.io()); + switch (cond_impl.event) { + .queued => {}, + .wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) { + .one => if (@cmpxchgStrong( + ?*Fiber, + @as(*?*Fiber, @ptrCast(&cond.state)), + null, + next_fiber, + .release, + .acquire, + )) |old_fiber| { + const old_cond_impl = old_fiber.?.resultPointer(ConditionImpl); + assert(old_cond_impl.tail.queue_next == null); + old_cond_impl.tail.queue_next = next_fiber; + old_cond_impl.tail = cond_impl.tail; + }, + .all => el.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }), + }, + } + fiber.queue_next = null; } -fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition) void { +fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void { const el: *EventLoop = @alignCast(@ptrCast(userdata)); - const cond_state: *?*Fiber = @ptrCast(&cond.state); - if (@atomicRmw(?*Fiber, cond_state, .Xchg, null, .acquire)) |fiber| { - el.yield(fiber, .reschedule); - } + const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return; + waiting_fiber.resultPointer(ConditionImpl).event = .{ .wake = wake }; + el.yield(waiting_fiber, .reschedule); } fn errno(signed: i32) std.os.linux.E { |
