diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2025-12-29 18:24:40 -0800 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-12-29 22:51:06 -0800 |
| commit | 2a02c7a0d59e25bac07a6ed2948a29438fb05527 (patch) | |
| tree | e68a1855272d8d64d1934802501eb2d45f0992f3 | |
| parent | 2adfd4d107f071f91608bef22c7e91b1a9a93470 (diff) | |
| download | zig-Group-Canceled.tar.gz zig-Group-Canceled.zip | |
std.Io.Group: async and concurrent support Cancelable resultsGroup-Canceled
Now, the return type of functions spawned with `Group.async` and
`Group.concurrent` may be anything that coerces to `Io.Cancelable!void`.
Before this commit, group tasks were the only exception to the rule
"error.Canceled should never be swallowed". Now, there is no exception,
and it is enforced with an assertion upon closure completion.
Finally, fixes a case of swallowing error.Canceled in the compiler,
solving a TODO.
There are three ways to handle `error.Canceled`. In order of most
common:
1. Propagate it
2. After receiving it, io.recancel() and then don't propagate it
3. Make it unreachable with io.swapCancelProtection()
| -rw-r--r-- | lib/std/Io.zig | 14 | ||||
| -rw-r--r-- | lib/std/Io/Threaded.zig | 34 | ||||
| -rw-r--r-- | lib/std/Io/test.zig | 33 | ||||
| -rw-r--r-- | src/Package/Fetch.zig | 4 |
4 files changed, 61 insertions, 24 deletions
diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 6f5444ffa4..c287115db8 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -631,7 +631,7 @@ pub const VTable = struct { /// Copied and then passed to `start`. context: []const u8, context_alignment: std.mem.Alignment, - start: *const fn (*Group, context: *const anyopaque) void, + start: *const fn (*Group, context: *const anyopaque) Cancelable!void, ) void, /// Thread-safe. groupConcurrent: *const fn ( @@ -642,7 +642,7 @@ pub const VTable = struct { /// Copied and then passed to `start`. context: []const u8, context_alignment: std.mem.Alignment, - start: *const fn (*Group, context: *const anyopaque) void, + start: *const fn (*Group, context: *const anyopaque) Cancelable!void, ) ConcurrentError!void, groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, @@ -1082,10 +1082,10 @@ pub const Group = struct { pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { _ = group; const args_casted: *const Args = @ptrCast(@alignCast(context)); - @call(.auto, function, args_casted.*); + return @call(.auto, function, args_casted.*); } }; io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); @@ -1110,10 +1110,10 @@ pub const Group = struct { pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { _ = group; const args_casted: *const Args = @ptrCast(@alignCast(context)); - @call(.auto, function, args_casted.*); + return @call(.auto, function, args_casted.*); } }; return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); @@ -1265,7 +1265,7 @@ pub fn Select(comptime U: type) type { ) void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { const args_casted: *const Args = @ptrCast(@alignCast(context)); const unerased_select: *S = @fieldParentPtr("group", group); const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*)); diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 19f15dcab0..5f54686a43 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1279,7 +1279,7 @@ const GroupClosure = struct { group: *Io.Group, /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. node: std.SinglyLinkedList.Node, - func: *const fn (*Io.Group, context: *anyopaque) void, + func: *const fn (*Io.Group, context: *anyopaque) Io.Cancelable!void, context_alignment: Alignment, alloc_len: usize, @@ -1292,7 +1292,7 @@ const GroupClosure = struct { current_thread.current_closure = closure; current_thread.cancel_protection = .unblocked; - gc.func(group, gc.contextPointer()); + assertResult(closure, gc.func(group, gc.contextPointer())); current_thread.current_closure = null; current_thread.cancel_protection = undefined; @@ -1302,6 +1302,16 @@ const GroupClosure = struct { if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t)); } + fn assertResult(closure: *Closure, result: Io.Cancelable!void) void { + if (result) |_| switch (closure.cancel_status.unpack()) { + .none, .requested => {}, + .acknowledged => unreachable, // task illegally swallowed error.Canceled + .signal_id => unreachable, + } else |err| switch (err) { + error.Canceled => assert(closure.cancel_status == .acknowledged), + } + } + fn contextPointer(gc: *GroupClosure) [*]u8 { const base: [*]u8 = @ptrCast(gc); const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc); @@ -1314,7 +1324,7 @@ const GroupClosure = struct { group: *Io.Group, context: []const u8, context_alignment: Alignment, - func: *const fn (*Io.Group, context: *const anyopaque) void, + func: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) Allocator.Error!*GroupClosure { const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure); const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment); @@ -1352,14 +1362,14 @@ fn groupAsync( group: *Io.Group, context: []const u8, context_alignment: Alignment, - start: *const fn (*Io.Group, context: *const anyopaque) void, + start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded) return start(group, context.ptr); + if (builtin.single_threaded) return start(group, context.ptr) catch unreachable; const gpa = t.allocator; const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); t.mutex.lock(); @@ -1368,7 +1378,7 @@ fn groupAsync( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); gc.deinit(gpa); - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); } t.busy_count = busy_count + 1; @@ -1381,7 +1391,7 @@ fn groupAsync( t.busy_count = busy_count; t.mutex.unlock(); gc.deinit(gpa); - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); }; thread.detach(); } @@ -1402,12 +1412,18 @@ fn groupAsync( t.cond.signal(); } +fn assertGroupResult(t: *Threaded, result: Io.Cancelable!void) void { + const current_thread: *Thread = .getCurrent(t); + const current_closure = current_thread.current_closure orelse return; + GroupClosure.assertResult(current_closure, result); +} + fn groupConcurrent( userdata: ?*anyopaque, group: *Io.Group, context: []const u8, context_alignment: Alignment, - start: *const fn (*Io.Group, context: *const anyopaque) void, + start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) Io.ConcurrentError!void { if (builtin.single_threaded) return error.ConcurrencyUnavailable; diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index 50c5f1a756..582ad06615 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -207,27 +207,48 @@ fn count(a: usize, b: usize, result: *usize) void { result.* = sum; } -test "Group cancellation" { +test "Group cancelation" { const io = testing.io; var group: Io.Group = .init; - var results: [2]usize = undefined; + var results: [4]usize = .{ 0, 0, 0, 0 }; + // TODO when robust cancelation is available, make the sleep timeouts much + // longer so that it causes the unit test to be failed if not canceled. + // https://codeberg.org/ziglang/zig/issues/30049 group.async(io, sleep, .{ io, &results[0] }); group.async(io, sleep, .{ io, &results[1] }); + group.async(io, sleepUncancelable, .{ io, &results[2] }); + group.async(io, sleepRecancel, .{ io, &results[3] }); group.cancel(io); - try testing.expectEqualSlices(usize, &.{ 1, 1 }, &results); + try testing.expectEqualSlices(usize, &.{ 1, 1, 1, 1 }, &results); +} + +fn sleep(io: Io, result: *usize) error{Canceled}!void { + defer result.* = 1; + io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) { + error.Canceled => |e| return e, + else => {}, + }; } -fn sleep(io: Io, result: *usize) void { - // TODO when cancellation race bug is fixed, make this timeout much longer so that - // it causes the unit test to be failed if not canceled. +fn sleepUncancelable(io: Io, result: *usize) void { + const old_prot = io.swapCancelProtection(.blocked); + defer _ = io.swapCancelProtection(old_prot); io.sleep(.fromMilliseconds(1), .awake) catch {}; result.* = 1; } +fn sleepRecancel(io: Io, result: *usize) void { + io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) { + error.Canceled => io.recancel(), + else => {}, + }; + result.* = 1; +} + test "Group concurrent" { const io = testing.io; diff --git a/src/Package/Fetch.zig b/src/Package/Fetch.zig index 1f7bfba363..d595465db5 100644 --- a/src/Package/Fetch.zig +++ b/src/Package/Fetch.zig @@ -843,13 +843,13 @@ pub fn relativePathDigest(pkg_root: Cache.Path, cache_root: Cache.Directory) Pac return .initPath(pkg_root.sub_path, pkg_root.root_dir.eql(cache_root)); } -pub fn workerRun(f: *Fetch, prog_name: []const u8) void { +pub fn workerRun(f: *Fetch, prog_name: []const u8) Io.Cancelable!void { const prog_node = f.prog_node.start(prog_name, 0); defer prog_node.end(); run(f) catch |err| switch (err) { error.OutOfMemory => f.oom_flag = true, - error.Canceled => {}, // TODO make groupAsync functions be cancelable and assert proper value was returned + error.Canceled => |e| return e, error.FetchFailed => { // Nothing to do because the errors are already reported in `error_bundle`, // and a reference is kept to the `Fetch` task inside `all_fetches`. |
