diff options
Diffstat (limited to 'lib/std')
| -rw-r--r-- | lib/std/Io.zig | 114 | ||||
| -rw-r--r-- | lib/std/Io/Threaded.zig | 75 | ||||
| -rw-r--r-- | lib/std/Io/test.zig | 75 |
3 files changed, 251 insertions, 13 deletions
diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 424f5a8e70..7c5c0ed70a 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -650,6 +650,10 @@ pub const VTable = struct { groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, + recancel: *const fn (?*anyopaque) void, + swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection, + checkCancel: *const fn (?*anyopaque) Cancelable!void, + /// Blocks until one of the futures from the list has a result ready, such /// that awaiting it will not block. Returns that index. select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize, @@ -982,7 +986,14 @@ pub fn Future(Result: type) type { any_future: ?*AnyFuture, result: Result, - /// Equivalent to `await` but places a cancellation request. + /// Equivalent to `await` but places a cancellation request. This causes the task to receive + /// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a + /// call to a function in `Io` which can return `error.Canceled`. + /// + /// After cancelation of a task is requested, only the next cancelation point in that task + /// will return `error.Canceled`: future points will not re-signal the cancelation. As such, + /// it is usually a bug to ignore `error.Canceled`. However, to defer handling cancelation + /// requests, see also `recancel` and `CancelProtection`. /// /// Idempotent. Not threadsafe. pub fn cancel(f: *@This(), io: Io) Result { @@ -1079,6 +1090,8 @@ pub const Group = struct { /// Equivalent to `wait` but immediately requests cancellation on all /// members of the group. /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + /// /// Idempotent. Not threadsafe. pub fn cancel(g: *Group, io: Io) void { const token = g.token orelse return; @@ -1087,6 +1100,61 @@ pub const Group = struct { } }; +/// Asserts that `error.Canceled` was returned from a prior cancelation point, and "re-arms" the +/// cancelation request, so that `error.Canceled` will be returned again from the next cancelation +/// point. +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub fn recancel(io: Io) void { + io.vtable.recancel(io.userdata); +} + +/// In rare cases, it is desirable to completely block cancelation notification, so that a region +/// of code can run uninterrupted before `error.Canceled` is potentially observed. Therefore, every +/// task has a "cancel protection" state which indicates whether or not `Io` functions can introduce +/// cancelation points. +/// +/// To modify a task's cancel protection state, see `swapCancelProtection`. +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub const CancelProtection = enum { + /// Any call to an `Io` function with `error.Canceled` in its error set is a cancelation point. + /// + /// This is the default state, which all tasks are created in. + unblocked, + /// No `Io` function introduces a cancelation point (`error.Canceled` will never be returned). + blocked, +}; +/// Updates the current task's cancel protection state (see `CancelProtection`). +/// +/// The typical usage for this function is to protect a block of code from cancelation: +/// ``` +/// const old_cancel_protect = io.swapCancelProtection(.blocked); +/// defer _ = io.swapCancelProtection(old_cancel_protect); +/// doSomeWork() catch |err| switch (err) { +/// error.Canceled => unreachable, +/// }; +/// ``` +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub fn swapCancelProtection(io: Io, new: CancelProtection) CancelProtection { + return io.vtable.swapCancelProtection(io.userdata, new); +} + +/// This function acts as a pure cancelation point (subject to protection; see `CancelProtection`) +/// and does nothing else. In other words, it returns `error.Canceled` if there is an outstanding +/// non-blocked cancelation request, but otherwise is a no-op. +/// +/// It is rarely necessary to call this function. The primary use case is in long-running CPU-bound +/// tasks which may need to respond to cancelation before completing. Short tasks, or those which +/// perform other `Io` operations (and hence have other cancelation points), will typically already +/// respond quickly to cancelation requests. +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. +pub fn checkCancel(io: Io) Cancelable!void { + return io.vtable.checkCancel(io.userdata); +} + pub fn Select(comptime U: type) type { return struct { io: Io, @@ -1160,6 +1228,8 @@ pub fn Select(comptime U: type) type { /// Equivalent to `wait` but requests cancellation on all remaining /// tasks owned by the select. /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + /// /// It is illegal to call `wait` after this. /// /// Idempotent. Not threadsafe. @@ -1193,7 +1263,9 @@ pub fn futexWaitTimeout(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) con const expected_raw: *align(1) const u32 = @ptrCast(&expected); return io.vtable.futexWait(io.userdata, @ptrCast(ptr), expected_raw.*, timeout); } -/// Same as `futexWait`, except is not affected by task cancelation. +/// Same as `futexWait`, except does not introduce a cancelation point. +/// +/// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn futexWaitUncancelable(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) void { comptime assert(@sizeOf(T) == @sizeOf(u32)); const expected_raw: *align(1) const u32 = @ptrCast(&expected); @@ -1247,6 +1319,9 @@ pub const Mutex = struct { } } + /// Same as `lock`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn lockUncancelable(m: *Mutex, io: Io) void { const initial_state = m.state.cmpxchgWeak( .unlocked, @@ -1296,6 +1371,9 @@ pub const Condition = struct { try waitInner(cond, io, mutex, false); } + /// Same as `wait`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void { waitInner(cond, io, mutex, true) catch |err| switch (err) { error.Canceled => unreachable, @@ -1424,7 +1502,9 @@ pub const Event = enum(u32) { } } - /// Same as `wait` except uninterruptible. + /// Same as `wait`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn waitUncancelable(event: *Event, io: Io) void { if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) { .unset => unreachable, @@ -1531,7 +1611,9 @@ pub const TypeErasedQueue = struct { return q.putLocked(io, elements, min, false); } - /// Same as `put` but cannot be canceled. + /// Same as `put`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize { assert(elements.len >= min); if (elements.len == 0) return 0; @@ -1602,7 +1684,10 @@ pub const TypeErasedQueue = struct { return q.getLocked(io, buffer, min, false); } - pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize { + /// Same as `get`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. + pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) usize { assert(buffer.len >= min); if (buffer.len == 0) return 0; q.mutex.lockUncancelable(io); @@ -1722,7 +1807,9 @@ pub fn Queue(Elem: type) type { assert(try q.put(io, elements, elements.len) == elements.len); } - /// Same as `put` but cannot be interrupted. + /// Same as `put`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize { return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); } @@ -1731,6 +1818,9 @@ pub fn Queue(Elem: type) type { assert(try q.put(io, &.{item}, 1) == 1); } + /// Same as `putOne`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void { assert(q.putUncancelable(io, &.{item}, 1) == 1); } @@ -1746,8 +1836,11 @@ pub fn Queue(Elem: type) type { return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); } + /// Same as `get`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize { - return @divExact(q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); + return @divExact(try q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); } pub fn getOne(q: *@This(), io: Io) Cancelable!Elem { @@ -1756,6 +1849,9 @@ pub fn Queue(Elem: type) type { return buf[0]; } + /// Same as `getOne`, except does not introduce a cancelation point. + /// + /// For a description of cancelation and cancelation points, see `Future.cancel`. pub fn getOneUncancelable(q: *@This(), io: Io) Elem { var buf: [1]Elem = undefined; assert(q.getUncancelable(io, &buf, 1) == 1); @@ -1846,10 +1942,6 @@ pub fn concurrent( return future; } -pub fn cancelRequested(io: Io) bool { - return io.vtable.cancelRequested(io.userdata); -} - pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable; pub fn sleep(io: Io, duration: Duration, clock: Clock) SleepError!void { diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 8d326c70ab..65d7f1dec2 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -86,7 +86,9 @@ const Thread = struct { /// The value that needs to be passed to pthread_kill or tgkill in order to /// send a signal. signal_id: SignaleeId, - current_closure: ?*Closure = null, + current_closure: ?*Closure, + /// Only populated if `current_closure != null`. Indicates the current cancel protection mode. + cancel_protection: Io.CancelProtection, const SignaleeId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id; @@ -98,6 +100,12 @@ const Thread = struct { fn checkCancel(thread: *Thread) error{Canceled}!void { const closure = thread.current_closure orelse return; + + switch (thread.cancel_protection) { + .unblocked => {}, + .blocked => return, + } + switch (@cmpxchgStrong( CancelStatus, &closure.cancel_status, @@ -115,6 +123,11 @@ const Thread = struct { fn beginSyscall(thread: *Thread) error{Canceled}!void { const closure = thread.current_closure orelse return; + switch (thread.cancel_protection) { + .unblocked => {}, + .blocked => return, + } + switch (@cmpxchgStrong( CancelStatus, &closure.cancel_status, @@ -135,6 +148,12 @@ const Thread = struct { fn endSyscall(thread: *Thread) void { const closure = thread.current_closure orelse return; + + switch (thread.cancel_protection) { + .unblocked => {}, + .blocked => return, + } + _ = @cmpxchgStrong( CancelStatus, &closure.cancel_status, @@ -512,6 +531,8 @@ pub fn init( .have_signal_handler = false, .main_thread = .{ .signal_id = Thread.currentSignalId(), + .current_closure = null, + .cancel_protection = undefined, }, }; @@ -546,7 +567,11 @@ pub const init_single_threaded: Threaded = .{ .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, - .main_thread = .{ .signal_id = undefined }, + .main_thread = .{ + .signal_id = undefined, + .current_closure = null, + .cancel_protection = undefined, + }, }; pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void { @@ -581,6 +606,8 @@ fn join(t: *Threaded) void { fn worker(t: *Threaded) void { var thread: Thread = .{ .signal_id = Thread.currentSignalId(), + .current_closure = null, + .cancel_protection = undefined, }; Thread.current = &thread; @@ -617,6 +644,10 @@ pub fn io(t: *Threaded) Io { .groupWait = groupWait, .groupCancel = groupCancel, + .recancel = recancel, + .swapCancelProtection = swapCancelProtection, + .checkCancel = checkCancel, + .futexWait = futexWait, .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, @@ -709,6 +740,10 @@ pub fn ioBasic(t: *Threaded) Io { .groupWait = groupWait, .groupCancel = groupCancel, + .recancel = recancel, + .swapCancelProtection = swapCancelProtection, + .checkCancel = checkCancel, + .futexWait = futexWait, .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, @@ -794,9 +829,14 @@ const AsyncClosure = struct { fn start(closure: *Closure, t: *Threaded) void { const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure)); const current_thread = Thread.getCurrent(t); + current_thread.current_closure = closure; + current_thread.cancel_protection = .unblocked; + ac.func(ac.contextPointer(), ac.resultPointer()); + current_thread.current_closure = null; + current_thread.cancel_protection = undefined; if (@atomicRmw(?*Io.Event, &ac.select_condition, .Xchg, done_event, .release)) |select_event| { assert(select_event != done_event); @@ -978,9 +1018,14 @@ const GroupClosure = struct { const group = gc.group; const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); const event: *Io.Event = @ptrCast(&group.context); + current_thread.current_closure = closure; + current_thread.cancel_protection = .unblocked; + gc.func(group, gc.contextPointer()); + current_thread.current_closure = null; + current_thread.cancel_protection = undefined; const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel); assert((prev_state / sync_one_pending) > 0); @@ -1201,6 +1246,32 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void } } +fn recancel(userdata: ?*anyopaque) void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const current_thread: *Thread = .getCurrent(t); + const cancel_status = ¤t_thread.current_closure.?.cancel_status; + switch (@atomicLoad(CancelStatus, cancel_status, .monotonic)) { + .none => unreachable, // called `recancel` when not canceled + .requested => unreachable, // called `recancel` when cancelation was already outstanding + .acknowledged => {}, + _ => unreachable, // invalid state: not in a syscall + } + @atomicStore(CancelStatus, cancel_status, .requested, .monotonic); +} + +fn swapCancelProtection(userdata: ?*anyopaque, new: Io.CancelProtection) Io.CancelProtection { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const current_thread: *Thread = .getCurrent(t); + const old = current_thread.cancel_protection; + current_thread.cancel_protection = new; + return old; +} + +fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + return Thread.getCurrent(t).checkCancel(); +} + fn await( userdata: ?*anyopaque, any_future: *Io.AnyFuture, diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index c0a4445c1e..94f280b358 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -291,3 +291,78 @@ test "Event" { try std.testing.expectError(error.Canceled, future.cancel(io)); } } + +test "recancel" { + const global = struct { + fn worker(io: Io) Io.Cancelable!void { + var dummy_event: Io.Event = .unset; + + if (dummy_event.wait(io)) { + return; + } else |err| switch (err) { + error.Canceled => io.recancel(), + } + + // Now we expect to see `error.Canceled` again. + return dummy_event.wait(io); + } + }; + + const io = std.testing.io; + var future = io.concurrent(global.worker, .{io}) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + if (future.cancel(io)) { + return error.UnexpectedSuccess; // both `wait` calls should have returned `error.Canceled` + } else |err| switch (err) { + error.Canceled => {}, + } +} + +test "swapCancelProtection" { + const global = struct { + fn waitTwice( + io: Io, + event: *Io.Event, + ) error{ Canceled, CanceledWhileProtected }!void { + // Wait for `event` while protected from cancelation. + { + const old_prot = io.swapCancelProtection(.blocked); + defer _ = io.swapCancelProtection(old_prot); + event.wait(io) catch |err| switch (err) { + error.Canceled => return error.CanceledWhileProtected, + }; + } + // Reset the event (it will never be set again), and this time wait for it without protection. + event.reset(); + _ = try event.wait(io); + } + fn sleepThenSet(io: Io, event: *Io.Event) !void { + // Give `waitTwice` a chance to get canceled. + try io.sleep(.fromMilliseconds(200), .awake); + event.set(io); + } + }; + + const io = std.testing.io; + + var event: Io.Event = .unset; + + var wait_future = io.concurrent(global.waitTwice, .{ io, &event }) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + defer wait_future.cancel(io) catch {}; + + var set_future = try io.concurrent(global.sleepThenSet, .{ io, &event }); + defer set_future.cancel(io) catch {}; + + if (wait_future.cancel(io)) { + return error.UnexpectedSuccess; // there was no `set` call to unblock the second `wait` + } else |err| switch (err) { + error.Canceled => {}, + error.CanceledWhileProtected => |e| return e, + } + + // Because it reached the `set`, it should be too late for `sleepThenSet` to see `error.Canceled`. + try set_future.cancel(io); +} |
