aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/std/Io.zig14
-rw-r--r--lib/std/Io/Threaded.zig34
-rw-r--r--lib/std/Io/test.zig33
-rw-r--r--src/Package/Fetch.zig4
4 files changed, 61 insertions, 24 deletions
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
index 6f5444ffa4..c287115db8 100644
--- a/lib/std/Io.zig
+++ b/lib/std/Io.zig
@@ -631,7 +631,7 @@ pub const VTable = struct {
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
- start: *const fn (*Group, context: *const anyopaque) void,
+ start: *const fn (*Group, context: *const anyopaque) Cancelable!void,
) void,
/// Thread-safe.
groupConcurrent: *const fn (
@@ -642,7 +642,7 @@ pub const VTable = struct {
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
- start: *const fn (*Group, context: *const anyopaque) void,
+ start: *const fn (*Group, context: *const anyopaque) Cancelable!void,
) ConcurrentError!void,
groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
@@ -1082,10 +1082,10 @@ pub const Group = struct {
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
_ = group;
const args_casted: *const Args = @ptrCast(@alignCast(context));
- @call(.auto, function, args_casted.*);
+ return @call(.auto, function, args_casted.*);
}
};
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
@@ -1110,10 +1110,10 @@ pub const Group = struct {
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
_ = group;
const args_casted: *const Args = @ptrCast(@alignCast(context));
- @call(.auto, function, args_casted.*);
+ return @call(.auto, function, args_casted.*);
}
};
return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
@@ -1265,7 +1265,7 @@ pub fn Select(comptime U: type) type {
) void {
const Args = @TypeOf(args);
const TypeErased = struct {
- fn start(group: *Group, context: *const anyopaque) void {
+ fn start(group: *Group, context: *const anyopaque) Cancelable!void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
const unerased_select: *S = @fieldParentPtr("group", group);
const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*));
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
index 19f15dcab0..5f54686a43 100644
--- a/lib/std/Io/Threaded.zig
+++ b/lib/std/Io/Threaded.zig
@@ -1279,7 +1279,7 @@ const GroupClosure = struct {
group: *Io.Group,
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node,
- func: *const fn (*Io.Group, context: *anyopaque) void,
+ func: *const fn (*Io.Group, context: *anyopaque) Io.Cancelable!void,
context_alignment: Alignment,
alloc_len: usize,
@@ -1292,7 +1292,7 @@ const GroupClosure = struct {
current_thread.current_closure = closure;
current_thread.cancel_protection = .unblocked;
- gc.func(group, gc.contextPointer());
+ assertResult(closure, gc.func(group, gc.contextPointer()));
current_thread.current_closure = null;
current_thread.cancel_protection = undefined;
@@ -1302,6 +1302,16 @@ const GroupClosure = struct {
if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t));
}
+ fn assertResult(closure: *Closure, result: Io.Cancelable!void) void {
+ if (result) |_| switch (closure.cancel_status.unpack()) {
+ .none, .requested => {},
+ .acknowledged => unreachable, // task illegally swallowed error.Canceled
+ .signal_id => unreachable,
+ } else |err| switch (err) {
+ error.Canceled => assert(closure.cancel_status == .acknowledged),
+ }
+ }
+
fn contextPointer(gc: *GroupClosure) [*]u8 {
const base: [*]u8 = @ptrCast(gc);
const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc);
@@ -1314,7 +1324,7 @@ const GroupClosure = struct {
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- func: *const fn (*Io.Group, context: *const anyopaque) void,
+ func: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) Allocator.Error!*GroupClosure {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure);
const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment);
@@ -1352,14 +1362,14 @@ fn groupAsync(
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- start: *const fn (*Io.Group, context: *const anyopaque) void,
+ start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
- if (builtin.single_threaded) return start(group, context.ptr);
+ if (builtin.single_threaded) return start(group, context.ptr) catch unreachable;
const gpa = t.allocator;
const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
t.mutex.lock();
@@ -1368,7 +1378,7 @@ fn groupAsync(
if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock();
gc.deinit(gpa);
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
}
t.busy_count = busy_count + 1;
@@ -1381,7 +1391,7 @@ fn groupAsync(
t.busy_count = busy_count;
t.mutex.unlock();
gc.deinit(gpa);
- return start(group, context.ptr);
+ return t.assertGroupResult(start(group, context.ptr));
};
thread.detach();
}
@@ -1402,12 +1412,18 @@ fn groupAsync(
t.cond.signal();
}
+fn assertGroupResult(t: *Threaded, result: Io.Cancelable!void) void {
+ const current_thread: *Thread = .getCurrent(t);
+ const current_closure = current_thread.current_closure orelse return;
+ GroupClosure.assertResult(current_closure, result);
+}
+
fn groupConcurrent(
userdata: ?*anyopaque,
group: *Io.Group,
context: []const u8,
context_alignment: Alignment,
- start: *const fn (*Io.Group, context: *const anyopaque) void,
+ start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
) Io.ConcurrentError!void {
if (builtin.single_threaded) return error.ConcurrencyUnavailable;
diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig
index 50c5f1a756..582ad06615 100644
--- a/lib/std/Io/test.zig
+++ b/lib/std/Io/test.zig
@@ -207,27 +207,48 @@ fn count(a: usize, b: usize, result: *usize) void {
result.* = sum;
}
-test "Group cancellation" {
+test "Group cancelation" {
const io = testing.io;
var group: Io.Group = .init;
- var results: [2]usize = undefined;
+ var results: [4]usize = .{ 0, 0, 0, 0 };
+ // TODO when robust cancelation is available, make the sleep timeouts much
+ // longer so that it causes the unit test to be failed if not canceled.
+ // https://codeberg.org/ziglang/zig/issues/30049
group.async(io, sleep, .{ io, &results[0] });
group.async(io, sleep, .{ io, &results[1] });
+ group.async(io, sleepUncancelable, .{ io, &results[2] });
+ group.async(io, sleepRecancel, .{ io, &results[3] });
group.cancel(io);
- try testing.expectEqualSlices(usize, &.{ 1, 1 }, &results);
+ try testing.expectEqualSlices(usize, &.{ 1, 1, 1, 1 }, &results);
+}
+
+fn sleep(io: Io, result: *usize) error{Canceled}!void {
+ defer result.* = 1;
+ io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) {
+ error.Canceled => |e| return e,
+ else => {},
+ };
}
-fn sleep(io: Io, result: *usize) void {
- // TODO when cancellation race bug is fixed, make this timeout much longer so that
- // it causes the unit test to be failed if not canceled.
+fn sleepUncancelable(io: Io, result: *usize) void {
+ const old_prot = io.swapCancelProtection(.blocked);
+ defer _ = io.swapCancelProtection(old_prot);
io.sleep(.fromMilliseconds(1), .awake) catch {};
result.* = 1;
}
+fn sleepRecancel(io: Io, result: *usize) void {
+ io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) {
+ error.Canceled => io.recancel(),
+ else => {},
+ };
+ result.* = 1;
+}
+
test "Group concurrent" {
const io = testing.io;
diff --git a/src/Package/Fetch.zig b/src/Package/Fetch.zig
index 1f7bfba363..d595465db5 100644
--- a/src/Package/Fetch.zig
+++ b/src/Package/Fetch.zig
@@ -843,13 +843,13 @@ pub fn relativePathDigest(pkg_root: Cache.Path, cache_root: Cache.Directory) Pac
return .initPath(pkg_root.sub_path, pkg_root.root_dir.eql(cache_root));
}
-pub fn workerRun(f: *Fetch, prog_name: []const u8) void {
+pub fn workerRun(f: *Fetch, prog_name: []const u8) Io.Cancelable!void {
const prog_node = f.prog_node.start(prog_name, 0);
defer prog_node.end();
run(f) catch |err| switch (err) {
error.OutOfMemory => f.oom_flag = true,
- error.Canceled => {}, // TODO make groupAsync functions be cancelable and assert proper value was returned
+ error.Canceled => |e| return e,
error.FetchFailed => {
// Nothing to do because the errors are already reported in `error_bundle`,
// and a reference is kept to the `Fetch` task inside `all_fetches`.