aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Io/EventLoop.zig
diff options
context:
space:
mode:
authorJacob Young <jacobly0@users.noreply.github.com>2025-04-02 18:03:53 -0400
committerAndrew Kelley <andrew@ziglang.org>2025-10-29 06:20:48 -0700
commitc4fcf85c43be10b703fa701ba377dd215dd91595 (patch)
tree0cd447827dfcc4f958947dc0160765460cadec54 /lib/std/Io/EventLoop.zig
parent3eb7be5cf6cb7050991f84c11f786c61bc5599b4 (diff)
downloadzig-c4fcf85c43be10b703fa701ba377dd215dd91595.tar.gz
zig-c4fcf85c43be10b703fa701ba377dd215dd91595.zip
Io.Condition: implement full API
Diffstat (limited to 'lib/std/Io/EventLoop.zig')
-rw-r--r--lib/std/Io/EventLoop.zig70
1 files changed, 55 insertions, 15 deletions
diff --git a/lib/std/Io/EventLoop.zig b/lib/std/Io/EventLoop.zig
index 83747bd008..edd00baac6 100644
--- a/lib/std/Io/EventLoop.zig
+++ b/lib/std/Io/EventLoop.zig
@@ -555,8 +555,24 @@ const SwitchMessage = struct {
.condition_wait => |condition_wait| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
- const cond_state: *?*Fiber = @ptrCast(&condition_wait.cond.state);
- assert(@atomicRmw(?*Fiber, cond_state, .Xchg, prev_fiber, .release) == null); // More than one wait on same Condition is illegal.
+ const cond_impl = prev_fiber.resultPointer(ConditionImpl);
+ cond_impl.* = .{
+ .tail = prev_fiber,
+ .event = .queued,
+ };
+ if (@cmpxchgStrong(
+ ?*Fiber,
+ @as(*?*Fiber, @ptrCast(&condition_wait.cond.state)),
+ null,
+ prev_fiber,
+ .release,
+ .acquire,
+ )) |waiting_fiber| {
+ const waiting_cond_impl = waiting_fiber.?.resultPointer(ConditionImpl);
+ assert(waiting_cond_impl.tail.queue_next == null);
+ waiting_cond_impl.tail.queue_next = prev_fiber;
+ waiting_cond_impl.tail = prev_fiber;
+ }
condition_wait.mutex.unlock(el.io());
},
.exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| {
@@ -1267,10 +1283,7 @@ fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadl
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void {
const el: *EventLoop = @alignCast(@ptrCast(userdata));
- el.yield(null, .{ .mutex_lock = .{
- .prev_state = prev_state,
- .mutex = mutex,
- } });
+ el.yield(null, .{ .mutex_lock = .{ .prev_state = prev_state, .mutex = mutex } });
}
fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
var maybe_waiting_fiber: ?*Fiber = @ptrFromInt(@intFromEnum(prev_state));
@@ -1294,21 +1307,48 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
el.yield(maybe_waiting_fiber.?, .reschedule);
}
+const ConditionImpl = struct {
+ tail: *Fiber,
+ event: union(enum) {
+ queued,
+ wake: Io.Condition.Wake,
+ },
+};
+
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
const el: *EventLoop = @alignCast(@ptrCast(userdata));
- el.yield(null, .{ .condition_wait = .{
- .cond = cond,
- .mutex = mutex,
- } });
+ el.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } });
+ const thread = Thread.current();
+ const fiber = thread.currentFiber();
+ const cond_impl = fiber.resultPointer(ConditionImpl);
try mutex.lock(el.io());
+ switch (cond_impl.event) {
+ .queued => {},
+ .wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) {
+ .one => if (@cmpxchgStrong(
+ ?*Fiber,
+ @as(*?*Fiber, @ptrCast(&cond.state)),
+ null,
+ next_fiber,
+ .release,
+ .acquire,
+ )) |old_fiber| {
+ const old_cond_impl = old_fiber.?.resultPointer(ConditionImpl);
+ assert(old_cond_impl.tail.queue_next == null);
+ old_cond_impl.tail.queue_next = next_fiber;
+ old_cond_impl.tail = cond_impl.tail;
+ },
+ .all => el.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }),
+ },
+ }
+ fiber.queue_next = null;
}
-fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition) void {
+fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
const el: *EventLoop = @alignCast(@ptrCast(userdata));
- const cond_state: *?*Fiber = @ptrCast(&cond.state);
- if (@atomicRmw(?*Fiber, cond_state, .Xchg, null, .acquire)) |fiber| {
- el.yield(fiber, .reschedule);
- }
+ const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return;
+ waiting_fiber.resultPointer(ConditionImpl).event = .{ .wake = wake };
+ el.yield(waiting_fiber, .reschedule);
}
fn errno(signed: i32) std.os.linux.E {