aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2025-12-29 18:24:40 -0800
committerAndrew Kelley <andrew@ziglang.org>2025-12-29 22:51:06 -0800
commit2a02c7a0d59e25bac07a6ed2948a29438fb05527 (patch)
treee68a1855272d8d64d1934802501eb2d45f0992f3
parent2adfd4d107f071f91608bef22c7e91b1a9a93470 (diff)
downloadzig-Group-Canceled.tar.gz
zig-Group-Canceled.zip
std.Io.Group: async and concurrent support Cancelable resultsGroup-Canceled
Now, the return type of functions spawned with `Group.async` and `Group.concurrent` may be anything that coerces to `Io.Cancelable!void`. Before this commit, group tasks were the only exception to the rule "error.Canceled should never be swallowed". Now, there is no exception, and it is enforced with an assertion upon closure completion. Finally, fixes a case of swallowing error.Canceled in the compiler, solving a TODO. There are three ways to handle `error.Canceled`. In order of most common: 1. Propagate it 2. After receiving it, io.recancel() and then don't propagate it 3. Make it unreachable with io.swapCancelProtection()
-rw-r--r--lib/std/Io.zig14
-rw-r--r--lib/std/Io/Threaded.zig34
-rw-r--r--lib/std/Io/test.zig33
-rw-r--r--src/Package/Fetch.zig4
4 files changed, 61 insertions, 24 deletions
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
index 6f5444ffa4..c287115db8 100644
--- a/lib/std/Io.zig
+++ b/lib/std/Io.zig
@@ -631,7 +631,7 @@ pub const VTable = struct {
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
- start: *const fn (*Group, context: *const anyopaque) void,
+ start: *const fn (*Group, context: *const anyopaque) Cancelable!void,
) void,
/// Thread-safe.
groupConcurrent: *const fn (
@@ -642,7 +642,7 @@ pub const VTable = struct {
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
- start: *const fn (*Group, context: *const anyopaque) void,
+ start: *const fn (*Group, context: *const anyopaque) Cancelable!void,
) ConcurrentError!void,
groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
@@ -1082,10 +1082,10 @@ pub const Group = struct {
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
_ = group;
const args_casted: *const Args = @ptrCast(@alignCast(context));
- @call(.auto, function, args_casted.*);
+ return @call(.auto, function, args_casted.*);
}
};
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
@@ -1110,10 +1110,10 @@ pub const Group = struct {
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
_ = group;
const args_casted: *const Args = @ptrCast(@alignCast(context));
- @call(.auto, function, args_casted.*);
+ return @call(.auto, function, args_casted.*);
}
};
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
@@ -1265,7 +1265,7 @@ pub fn Select(comptime U: type) type {
) void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
const unerased_select: *S = @fieldParentPtr("group", group);
const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*));
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
index 19f15dcab0..5f54686a43 100644
--- a/lib/std/Io/Threaded.zig
+++ b/lib/std/Io/Threaded.zig
@@ -1279,7 +1279,7 @@ const GroupClosure = struct {
group: *Io.Group,
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node,
- func: *const fn (*Io.Group, context: *anyopaque) void,
+ func: *const fn (*Io.Group, context: *anyopaque) Io.Cancelable!void,
context_alignment: Alignment,
alloc_len: usize,
@@ -1292,7 +1292,7 @@ const GroupClosure = struct {
current_thread.current_closure = closure;
current_thread.cancel_protection = .unblocked;
- gc.func(group, gc.contextPointer());
+ assertResult(closure, gc.func(group, gc.contextPointer()));
current_thread.current_closure = null;
current_thread.cancel_protection = undefined;
@@ -1302,6 +1302,16 @@ const GroupClosure = struct {
if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t));
}
+ fn assertResult(closure: *Closure, result: Io.Cancelable!void) void {
+ if (result) |_| switch (closure.cancel_status.unpack()) {
+ .none, .requested => {},
+ .acknowledged => unreachable, // task illegally swallowed error.Canceled
+ .signal_id => unreachable,
+ } else |err| switch (err) {
+ error.Canceled => assert(closure.cancel_status == .acknowledged),
+ }
+ }
+
fn contextPointer(gc: *GroupClosure) [*]u8 {
const base: [*]u8 = @ptrCast(gc);
const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc);
@@ -1314,7 +1324,7 @@ const GroupClosure = struct {
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- func: *const fn (*Io.Group, context: *const anyopaque) void,
+ func: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) Allocator.Error!*GroupClosure {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure);
const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment);
@@ -1352,14 +1362,14 @@ fn groupAsync(
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- start: *const fn (*Io.Group, context: *const anyopaque) void,
+ start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
- if (builtin.single_threaded) return start(group, context.ptr);
+ if (builtin.single_threaded) return start(group, context.ptr) catch unreachable;
const gpa = t.allocator;
const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
t.mutex.lock();
@@ -1368,7 +1378,7 @@ fn groupAsync(
if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock();
gc.deinit(gpa);
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
}
t.busy_count = busy_count + 1;
@@ -1381,7 +1391,7 @@ fn groupAsync(
t.busy_count = busy_count;
t.mutex.unlock();
gc.deinit(gpa);
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
};
thread.detach();
}
@@ -1402,12 +1412,18 @@ fn groupAsync(
t.cond.signal();
}
+fn assertGroupResult(t: *Threaded, result: Io.Cancelable!void) void {
+ const current_thread: *Thread = .getCurrent(t);
+ const current_closure = current_thread.current_closure orelse return;
+ GroupClosure.assertResult(current_closure, result);
+}
+
fn groupConcurrent(
userdata: ?*anyopaque,
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- start: *const fn (*Io.Group, context: *const anyopaque) void,
+ start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) Io.ConcurrentError!void {
if (builtin.single_threaded) return error.ConcurrencyUnavailable;
diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig
index 50c5f1a756..582ad06615 100644
--- a/lib/std/Io/test.zig
+++ b/lib/std/Io/test.zig
@@ -207,27 +207,48 @@ fn count(a: usize, b: usize, result: *usize) void {
result.* = sum;
}
-test "Group cancellation" {
+test "Group cancelation" {
const io = testing.io;
var group: Io.Group = .init;
- var results: [2]usize = undefined;
+ var results: [4]usize = .{ 0, 0, 0, 0 };
+ // TODO when robust cancelation is available, make the sleep timeouts much
+ // longer so that it causes the unit test to be failed if not canceled.
+ // https://codeberg.org/ziglang/zig/issues/30049
group.async(io, sleep, .{ io, &results[0] });
group.async(io, sleep, .{ io, &results[1] });
+ group.async(io, sleepUncancelable, .{ io, &results[2] });
+ group.async(io, sleepRecancel, .{ io, &results[3] });
group.cancel(io);
- try testing.expectEqualSlices(usize, &.{ 1, 1 }, &results);
+ try testing.expectEqualSlices(usize, &.{ 1, 1, 1, 1 }, &results);
+}
+
+fn sleep(io: Io, result: *usize) error{Canceled}!void {
+ defer result.* = 1;
+ io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) {
+ error.Canceled => |e| return e,
+ else => {},
+ };
}
-fn sleep(io: Io, result: *usize) void {
- // TODO when cancellation race bug is fixed, make this timeout much longer so that
- // it causes the unit test to be failed if not canceled.
+fn sleepUncancelable(io: Io, result: *usize) void {
+ const old_prot = io.swapCancelProtection(.blocked);
+ defer _ = io.swapCancelProtection(old_prot);
io.sleep(.fromMilliseconds(1), .awake) catch {};
result.* = 1;
}
+fn sleepRecancel(io: Io, result: *usize) void {
+ io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) {
+ error.Canceled => io.recancel(),
+ else => {},
+ };
+ result.* = 1;
+}
+
test "Group concurrent" {
const io = testing.io;
diff --git a/src/Package/Fetch.zig b/src/Package/Fetch.zig
index 1f7bfba363..d595465db5 100644
--- a/src/Package/Fetch.zig
+++ b/src/Package/Fetch.zig
@@ -843,13 +843,13 @@ pub fn relativePathDigest(pkg_root: Cache.Path, cache_root: Cache.Directory) Pac
return .initPath(pkg_root.sub_path, pkg_root.root_dir.eql(cache_root));
}
-pub fn workerRun(f: *Fetch, prog_name: []const u8) void {
+pub fn workerRun(f: *Fetch, prog_name: []const u8) Io.Cancelable!void {
const prog_node = f.prog_node.start(prog_name, 0);
defer prog_node.end();
run(f) catch |err| switch (err) {
error.OutOfMemory => f.oom_flag = true,
- error.Canceled => {}, // TODO make groupAsync functions be cancelable and assert proper value was returned
+ error.Canceled => |e| return e,
error.FetchFailed => {
// Nothing to do because the errors are already reported in `error_bundle`,
// and a reference is kept to the `Fetch` task inside `all_fetches`.