diff options
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Io/Threaded.zig | 48 |
1 files changed, 22 insertions, 26 deletions
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 6ed7bb6573..2a38fdf543 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1183,10 +1183,12 @@ fn groupConcurrent( t.cond.signal(); } -fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { +fn groupWait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; + _ = initial_token; // we need to load `token` *after* the group finishes + if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null` const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); @@ -1195,42 +1197,40 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { assert(prev_state & GroupClosure.sync_is_waiting == 0); if ((prev_state / GroupClosure.sync_one_pending) > 0) event.wait(ioBasic(t)) catch |err| switch (err) { error.Canceled => { - var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); - while (true) { + var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic))); + while (it) |node| : (it = node.next) { const gc: *GroupClosure = @fieldParentPtr("node", node); gc.closure.requestCancel(t); - node = node.next orelse break; } event.waitUncancelable(ioBasic(t)); }, }; - var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); - while (true) { - const gc: *GroupClosure = @fieldParentPtr("node", node); - const node_next = node.next; - gc.deinit(gpa); - node = node_next orelse break; - } - // Since the group has now finished, it's illegal to add more tasks to it until we return. It's // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only // thread who can access `group` right now. + var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.raw)); group.token.raw = null; + while (it) |node| { + it = node.next; // update `it` now, because `deinit` will invalidate `node` + const gc: *GroupClosure = @fieldParentPtr("node", node); + gc.deinit(gpa); + } } -fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void { +fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; + _ = initial_token; // we need to load `token` *after* the group finishes + if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null` { - var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); - while (true) { + var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic))); + while (it) |node| : (it = node.next) { const gc: *GroupClosure = @fieldParentPtr("node", node); gc.closure.requestCancel(t); - node = node.next orelse break; } } @@ -1240,20 +1240,16 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void assert(prev_state & GroupClosure.sync_is_waiting == 0); if ((prev_state / GroupClosure.sync_one_pending) > 0) event.waitUncancelable(ioBasic(t)); - { - var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token)); - while (true) { - const gc: *GroupClosure = @fieldParentPtr("node", node); - const node_next = node.next; - gc.deinit(gpa); - node = node_next orelse break; - } - } - // Since the group has now finished, it's illegal to add more tasks to it until we return. It's // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only // thread who can access `group` right now. + var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.raw)); group.token.raw = null; + while (it) |node| { + it = node.next; // update `it` now, because `deinit` will invalidate `node` + const gc: *GroupClosure = @fieldParentPtr("node", node); + gc.deinit(gpa); + } } fn recancel(userdata: ?*anyopaque) void { |
