diff options
| -rw-r--r-- | lib/std/Io.zig | 14 | ||||
| -rw-r--r-- | lib/std/Io/Threaded.zig | 34 | ||||
| -rw-r--r-- | lib/std/Io/test.zig | 33 | ||||
| -rw-r--r-- | src/Package/Fetch.zig | 4 |
4 files changed, 61 insertions, 24 deletions
diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 6f5444ffa4..c287115db8 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -631,7 +631,7 @@ pub const VTable = struct { /// Copied and then passed to `start`. context: []const u8, context_alignment: std.mem.Alignment, - start: *const fn (*Group, context: *const anyopaque) void, + start: *const fn (*Group, context: *const anyopaque) Cancelable!void, ) void, /// Thread-safe. groupConcurrent: *const fn ( @@ -642,7 +642,7 @@ pub const VTable = struct { /// Copied and then passed to `start`. context: []const u8, context_alignment: std.mem.Alignment, - start: *const fn (*Group, context: *const anyopaque) void, + start: *const fn (*Group, context: *const anyopaque) Cancelable!void, ) ConcurrentError!void, groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, @@ -1082,10 +1082,10 @@ pub const Group = struct { pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { _ = group; const args_casted: *const Args = @ptrCast(@alignCast(context)); - @call(.auto, function, args_casted.*); + return @call(.auto, function, args_casted.*); } }; io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); @@ -1110,10 +1110,10 @@ pub const Group = struct { pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { _ = group; const args_casted: *const Args = @ptrCast(@alignCast(context)); - @call(.auto, function, args_casted.*); + return @call(.auto, function, args_casted.*); } }; return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); @@ -1265,7 +1265,7 @@ pub fn Select(comptime U: type) type { ) void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) void { + fn start(group: *Group, context: *const anyopaque) Cancelable!void { const args_casted: *const Args = @ptrCast(@alignCast(context)); const unerased_select: *S = @fieldParentPtr("group", group); const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*)); diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 19f15dcab0..5f54686a43 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1279,7 +1279,7 @@ const GroupClosure = struct { group: *Io.Group, /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. node: std.SinglyLinkedList.Node, - func: *const fn (*Io.Group, context: *anyopaque) void, + func: *const fn (*Io.Group, context: *anyopaque) Io.Cancelable!void, context_alignment: Alignment, alloc_len: usize, @@ -1292,7 +1292,7 @@ const GroupClosure = struct { current_thread.current_closure = closure; current_thread.cancel_protection = .unblocked; - gc.func(group, gc.contextPointer()); + assertResult(closure, gc.func(group, gc.contextPointer())); current_thread.current_closure = null; current_thread.cancel_protection = undefined; @@ -1302,6 +1302,16 @@ const GroupClosure = struct { if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t)); } + fn assertResult(closure: *Closure, result: Io.Cancelable!void) void { + if (result) |_| switch (closure.cancel_status.unpack()) { + .none, .requested => {}, + .acknowledged => unreachable, // task illegally swallowed error.Canceled + .signal_id => unreachable, + } else |err| switch (err) { + error.Canceled => assert(closure.cancel_status == .acknowledged), + } + } + fn contextPointer(gc: *GroupClosure) [*]u8 { const base: [*]u8 = @ptrCast(gc); const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc); @@ -1314,7 +1324,7 @@ const GroupClosure = struct { group: *Io.Group, context: []const u8, context_alignment: Alignment, - func: *const fn (*Io.Group, context: *const anyopaque) void, + func: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) Allocator.Error!*GroupClosure { const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure); const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment); @@ -1352,14 +1362,14 @@ fn groupAsync( group: *Io.Group, context: []const u8, context_alignment: Alignment, - start: *const fn (*Io.Group, context: *const anyopaque) void, + start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded) return start(group, context.ptr); + if (builtin.single_threaded) return start(group, context.ptr) catch unreachable; const gpa = t.allocator; const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); t.mutex.lock(); @@ -1368,7 +1378,7 @@ fn groupAsync( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); gc.deinit(gpa); - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); } t.busy_count = busy_count + 1; @@ -1381,7 +1391,7 @@ fn groupAsync( t.busy_count = busy_count; t.mutex.unlock(); gc.deinit(gpa); - return start(group, context.ptr); + return t.assertGroupResult(start(group, context.ptr)); }; thread.detach(); } @@ -1402,12 +1412,18 @@ fn groupAsync( t.cond.signal(); } +fn assertGroupResult(t: *Threaded, result: Io.Cancelable!void) void { + const current_thread: *Thread = .getCurrent(t); + const current_closure = current_thread.current_closure orelse return; + GroupClosure.assertResult(current_closure, result); +} + fn groupConcurrent( userdata: ?*anyopaque, group: *Io.Group, context: []const u8, context_alignment: Alignment, - start: *const fn (*Io.Group, context: *const anyopaque) void, + start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) Io.ConcurrentError!void { if (builtin.single_threaded) return error.ConcurrencyUnavailable; diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index 50c5f1a756..582ad06615 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -207,27 +207,48 @@ fn count(a: usize, b: usize, result: *usize) void { result.* = sum; } -test "Group cancellation" { +test "Group cancelation" { const io = testing.io; var group: Io.Group = .init; - var results: [2]usize = undefined; + var results: [4]usize = .{ 0, 0, 0, 0 }; + // TODO when robust cancelation is available, make the sleep timeouts much + // longer so that it causes the unit test to be failed if not canceled. + // https://codeberg.org/ziglang/zig/issues/30049 group.async(io, sleep, .{ io, &results[0] }); group.async(io, sleep, .{ io, &results[1] }); + group.async(io, sleepUncancelable, .{ io, &results[2] }); + group.async(io, sleepRecancel, .{ io, &results[3] }); group.cancel(io); - try testing.expectEqualSlices(usize, &.{ 1, 1 }, &results); + try testing.expectEqualSlices(usize, &.{ 1, 1, 1, 1 }, &results); +} + +fn sleep(io: Io, result: *usize) error{Canceled}!void { + defer result.* = 1; + io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) { + error.Canceled => |e| return e, + else => {}, + }; } -fn sleep(io: Io, result: *usize) void { - // TODO when cancellation race bug is fixed, make this timeout much longer so that - // it causes the unit test to be failed if not canceled. +fn sleepUncancelable(io: Io, result: *usize) void { + const old_prot = io.swapCancelProtection(.blocked); + defer _ = io.swapCancelProtection(old_prot); io.sleep(.fromMilliseconds(1), .awake) catch {}; result.* = 1; } +fn sleepRecancel(io: Io, result: *usize) void { + io.sleep(.fromMilliseconds(1), .awake) catch |err| switch (err) { + error.Canceled => io.recancel(), + else => {}, + }; + result.* = 1; +} + test "Group concurrent" { const io = testing.io; diff --git a/src/Package/Fetch.zig b/src/Package/Fetch.zig index 1f7bfba363..d595465db5 100644 --- a/src/Package/Fetch.zig +++ b/src/Package/Fetch.zig @@ -843,13 +843,13 @@ pub fn relativePathDigest(pkg_root: Cache.Path, cache_root: Cache.Directory) Pac return .initPath(pkg_root.sub_path, pkg_root.root_dir.eql(cache_root)); } -pub fn workerRun(f: *Fetch, prog_name: []const u8) void { +pub fn workerRun(f: *Fetch, prog_name: []const u8) Io.Cancelable!void { const prog_node = f.prog_node.start(prog_name, 0); defer prog_node.end(); run(f) catch |err| switch (err) { error.OutOfMemory => f.oom_flag = true, - error.Canceled => {}, // TODO make groupAsync functions be cancelable and assert proper value was returned + error.Canceled => |e| return e, error.FetchFailed => { // Nothing to do because the errors are already reported in `error_bundle`, // and a reference is kept to the `Fetch` task inside `all_fetches`. |
