aboutsummaryrefslogtreecommitdiff
path: root/lib/std
diff options
context:
space:
mode:
Diffstat (limited to 'lib/std')
-rw-r--r--lib/std/Io.zig114
-rw-r--r--lib/std/Io/Threaded.zig75
-rw-r--r--lib/std/Io/test.zig75
3 files changed, 251 insertions, 13 deletions
diff --git a/lib/std/Io.zig b/lib/std/Io.zig
index 424f5a8e70..7c5c0ed70a 100644
--- a/lib/std/Io.zig
+++ b/lib/std/Io.zig
@@ -650,6 +650,10 @@ pub const VTable = struct {
groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
+ recancel: *const fn (?*anyopaque) void,
+ swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection,
+ checkCancel: *const fn (?*anyopaque) Cancelable!void,
+
/// Blocks until one of the futures from the list has a result ready, such
/// that awaiting it will not block. Returns that index.
select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize,
@@ -982,7 +986,14 @@ pub fn Future(Result: type) type {
any_future: ?*AnyFuture,
result: Result,
- /// Equivalent to `await` but places a cancellation request.
+ /// Equivalent to `await` but places a cancellation request. This causes the task to receive
+ /// `error.Canceled` from its next "cancelation point" (if any). A cancelation point is a
+ /// call to a function in `Io` which can return `error.Canceled`.
+ ///
+ /// After cancelation of a task is requested, only the next cancelation point in that task
+ /// will return `error.Canceled`: future points will not re-signal the cancelation. As such,
+ /// it is usually a bug to ignore `error.Canceled`. However, to defer handling cancelation
+ /// requests, see also `recancel` and `CancelProtection`.
///
/// Idempotent. Not threadsafe.
pub fn cancel(f: *@This(), io: Io) Result {
@@ -1079,6 +1090,8 @@ pub const Group = struct {
/// Equivalent to `wait` but immediately requests cancellation on all
/// members of the group.
///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
+ ///
/// Idempotent. Not threadsafe.
pub fn cancel(g: *Group, io: Io) void {
const token = g.token orelse return;
@@ -1087,6 +1100,61 @@ pub const Group = struct {
}
};
+/// Asserts that `error.Canceled` was returned from a prior cancelation point, and "re-arms" the
+/// cancelation request, so that `error.Canceled` will be returned again from the next cancelation
+/// point.
+///
+/// For a description of cancelation and cancelation points, see `Future.cancel`.
+pub fn recancel(io: Io) void {
+ io.vtable.recancel(io.userdata);
+}
+
+/// In rare cases, it is desirable to completely block cancelation notification, so that a region
+/// of code can run uninterrupted before `error.Canceled` is potentially observed. Therefore, every
+/// task has a "cancel protection" state which indicates whether or not `Io` functions can introduce
+/// cancelation points.
+///
+/// To modify a task's cancel protection state, see `swapCancelProtection`.
+///
+/// For a description of cancelation and cancelation points, see `Future.cancel`.
+pub const CancelProtection = enum {
+ /// Any call to an `Io` function with `error.Canceled` in its error set is a cancelation point.
+ ///
+ /// This is the default state, which all tasks are created in.
+ unblocked,
+ /// No `Io` function introduces a cancelation point (`error.Canceled` will never be returned).
+ blocked,
+};
+/// Updates the current task's cancel protection state (see `CancelProtection`).
+///
+/// The typical usage for this function is to protect a block of code from cancelation:
+/// ```
+/// const old_cancel_protect = io.swapCancelProtection(.blocked);
+/// defer _ = io.swapCancelProtection(old_cancel_protect);
+/// doSomeWork() catch |err| switch (err) {
+/// error.Canceled => unreachable,
+/// };
+/// ```
+///
+/// For a description of cancelation and cancelation points, see `Future.cancel`.
+pub fn swapCancelProtection(io: Io, new: CancelProtection) CancelProtection {
+ return io.vtable.swapCancelProtection(io.userdata, new);
+}
+
+/// This function acts as a pure cancelation point (subject to protection; see `CancelProtection`)
+/// and does nothing else. In other words, it returns `error.Canceled` if there is an outstanding
+/// non-blocked cancelation request, but otherwise is a no-op.
+///
+/// It is rarely necessary to call this function. The primary use case is in long-running CPU-bound
+/// tasks which may need to respond to cancelation before completing. Short tasks, or those which
+/// perform other `Io` operations (and hence have other cancelation points), will typically already
+/// respond quickly to cancelation requests.
+///
+/// For a description of cancelation and cancelation points, see `Future.cancel`.
+pub fn checkCancel(io: Io) Cancelable!void {
+ return io.vtable.checkCancel(io.userdata);
+}
+
pub fn Select(comptime U: type) type {
return struct {
io: Io,
@@ -1160,6 +1228,8 @@ pub fn Select(comptime U: type) type {
/// Equivalent to `wait` but requests cancellation on all remaining
/// tasks owned by the select.
///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
+ ///
/// It is illegal to call `wait` after this.
///
/// Idempotent. Not threadsafe.
@@ -1193,7 +1263,9 @@ pub fn futexWaitTimeout(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) con
const expected_raw: *align(1) const u32 = @ptrCast(&expected);
return io.vtable.futexWait(io.userdata, @ptrCast(ptr), expected_raw.*, timeout);
}
-/// Same as `futexWait`, except is not affected by task cancelation.
+/// Same as `futexWait`, except does not introduce a cancelation point.
+///
+/// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn futexWaitUncancelable(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) void {
comptime assert(@sizeOf(T) == @sizeOf(u32));
const expected_raw: *align(1) const u32 = @ptrCast(&expected);
@@ -1247,6 +1319,9 @@ pub const Mutex = struct {
}
}
+ /// Same as `lock`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn lockUncancelable(m: *Mutex, io: Io) void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
@@ -1296,6 +1371,9 @@ pub const Condition = struct {
try waitInner(cond, io, mutex, false);
}
+ /// Same as `wait`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
waitInner(cond, io, mutex, true) catch |err| switch (err) {
error.Canceled => unreachable,
@@ -1424,7 +1502,9 @@ pub const Event = enum(u32) {
}
}
- /// Same as `wait` except uninterruptible.
+ /// Same as `wait`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn waitUncancelable(event: *Event, io: Io) void {
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
@@ -1531,7 +1611,9 @@ pub const TypeErasedQueue = struct {
return q.putLocked(io, elements, min, false);
}
- /// Same as `put` but cannot be canceled.
+ /// Same as `put`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
assert(elements.len >= min);
if (elements.len == 0) return 0;
@@ -1602,7 +1684,10 @@ pub const TypeErasedQueue = struct {
return q.getLocked(io, buffer, min, false);
}
- pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
+ /// Same as `get`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
+ pub fn getUncancelable(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize) usize {
assert(buffer.len >= min);
if (buffer.len == 0) return 0;
q.mutex.lockUncancelable(io);
@@ -1722,7 +1807,9 @@ pub fn Queue(Elem: type) type {
assert(try q.put(io, elements, elements.len) == elements.len);
}
- /// Same as `put` but cannot be interrupted.
+ /// Same as `put`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
}
@@ -1731,6 +1818,9 @@ pub fn Queue(Elem: type) type {
assert(try q.put(io, &.{item}, 1) == 1);
}
+ /// Same as `putOne`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void {
assert(q.putUncancelable(io, &.{item}, 1) == 1);
}
@@ -1746,8 +1836,11 @@ pub fn Queue(Elem: type) type {
return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
+ /// Same as `get`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
- return @divExact(q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
+ return @divExact(try q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
pub fn getOne(q: *@This(), io: Io) Cancelable!Elem {
@@ -1756,6 +1849,9 @@ pub fn Queue(Elem: type) type {
return buf[0];
}
+ /// Same as `getOne`, except does not introduce a cancelation point.
+ ///
+ /// For a description of cancelation and cancelation points, see `Future.cancel`.
pub fn getOneUncancelable(q: *@This(), io: Io) Elem {
var buf: [1]Elem = undefined;
assert(q.getUncancelable(io, &buf, 1) == 1);
@@ -1846,10 +1942,6 @@ pub fn concurrent(
return future;
}
-pub fn cancelRequested(io: Io) bool {
- return io.vtable.cancelRequested(io.userdata);
-}
-
pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable;
pub fn sleep(io: Io, duration: Duration, clock: Clock) SleepError!void {
diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig
index 8d326c70ab..65d7f1dec2 100644
--- a/lib/std/Io/Threaded.zig
+++ b/lib/std/Io/Threaded.zig
@@ -86,7 +86,9 @@ const Thread = struct {
/// The value that needs to be passed to pthread_kill or tgkill in order to
/// send a signal.
signal_id: SignaleeId,
- current_closure: ?*Closure = null,
+ current_closure: ?*Closure,
+ /// Only populated if `current_closure != null`. Indicates the current cancel protection mode.
+ cancel_protection: Io.CancelProtection,
const SignaleeId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id;
@@ -98,6 +100,12 @@ const Thread = struct {
fn checkCancel(thread: *Thread) error{Canceled}!void {
const closure = thread.current_closure orelse return;
+
+ switch (thread.cancel_protection) {
+ .unblocked => {},
+ .blocked => return,
+ }
+
switch (@cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
@@ -115,6 +123,11 @@ const Thread = struct {
fn beginSyscall(thread: *Thread) error{Canceled}!void {
const closure = thread.current_closure orelse return;
+ switch (thread.cancel_protection) {
+ .unblocked => {},
+ .blocked => return,
+ }
+
switch (@cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
@@ -135,6 +148,12 @@ const Thread = struct {
fn endSyscall(thread: *Thread) void {
const closure = thread.current_closure orelse return;
+
+ switch (thread.cancel_protection) {
+ .unblocked => {},
+ .blocked => return,
+ }
+
_ = @cmpxchgStrong(
CancelStatus,
&closure.cancel_status,
@@ -512,6 +531,8 @@ pub fn init(
.have_signal_handler = false,
.main_thread = .{
.signal_id = Thread.currentSignalId(),
+ .current_closure = null,
+ .cancel_protection = undefined,
},
};
@@ -546,7 +567,11 @@ pub const init_single_threaded: Threaded = .{
.old_sig_io = undefined,
.old_sig_pipe = undefined,
.have_signal_handler = false,
- .main_thread = .{ .signal_id = undefined },
+ .main_thread = .{
+ .signal_id = undefined,
+ .current_closure = null,
+ .cancel_protection = undefined,
+ },
};
pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
@@ -581,6 +606,8 @@ fn join(t: *Threaded) void {
fn worker(t: *Threaded) void {
var thread: Thread = .{
.signal_id = Thread.currentSignalId(),
+ .current_closure = null,
+ .cancel_protection = undefined,
};
Thread.current = &thread;
@@ -617,6 +644,10 @@ pub fn io(t: *Threaded) Io {
.groupWait = groupWait,
.groupCancel = groupCancel,
+ .recancel = recancel,
+ .swapCancelProtection = swapCancelProtection,
+ .checkCancel = checkCancel,
+
.futexWait = futexWait,
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
@@ -709,6 +740,10 @@ pub fn ioBasic(t: *Threaded) Io {
.groupWait = groupWait,
.groupCancel = groupCancel,
+ .recancel = recancel,
+ .swapCancelProtection = swapCancelProtection,
+ .checkCancel = checkCancel,
+
.futexWait = futexWait,
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
@@ -794,9 +829,14 @@ const AsyncClosure = struct {
fn start(closure: *Closure, t: *Threaded) void {
const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
const current_thread = Thread.getCurrent(t);
+
current_thread.current_closure = closure;
+ current_thread.cancel_protection = .unblocked;
+
ac.func(ac.contextPointer(), ac.resultPointer());
+
current_thread.current_closure = null;
+ current_thread.cancel_protection = undefined;
if (@atomicRmw(?*Io.Event, &ac.select_condition, .Xchg, done_event, .release)) |select_event| {
assert(select_event != done_event);
@@ -978,9 +1018,14 @@ const GroupClosure = struct {
const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const event: *Io.Event = @ptrCast(&group.context);
+
current_thread.current_closure = closure;
+ current_thread.cancel_protection = .unblocked;
+
gc.func(group, gc.contextPointer());
+
current_thread.current_closure = null;
+ current_thread.cancel_protection = undefined;
const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel);
assert((prev_state / sync_one_pending) > 0);
@@ -1201,6 +1246,32 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void
}
}
+fn recancel(userdata: ?*anyopaque) void {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const current_thread: *Thread = .getCurrent(t);
+ const cancel_status = &current_thread.current_closure.?.cancel_status;
+ switch (@atomicLoad(CancelStatus, cancel_status, .monotonic)) {
+ .none => unreachable, // called `recancel` when not canceled
+ .requested => unreachable, // called `recancel` when cancelation was already outstanding
+ .acknowledged => {},
+ _ => unreachable, // invalid state: not in a syscall
+ }
+ @atomicStore(CancelStatus, cancel_status, .requested, .monotonic);
+}
+
+fn swapCancelProtection(userdata: ?*anyopaque, new: Io.CancelProtection) Io.CancelProtection {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const current_thread: *Thread = .getCurrent(t);
+ const old = current_thread.cancel_protection;
+ current_thread.cancel_protection = new;
+ return old;
+}
+
+fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ return Thread.getCurrent(t).checkCancel();
+}
+
fn await(
userdata: ?*anyopaque,
any_future: *Io.AnyFuture,
diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig
index c0a4445c1e..94f280b358 100644
--- a/lib/std/Io/test.zig
+++ b/lib/std/Io/test.zig
@@ -291,3 +291,78 @@ test "Event" {
try std.testing.expectError(error.Canceled, future.cancel(io));
}
}
+
+test "recancel" {
+ const global = struct {
+ fn worker(io: Io) Io.Cancelable!void {
+ var dummy_event: Io.Event = .unset;
+
+ if (dummy_event.wait(io)) {
+ return;
+ } else |err| switch (err) {
+ error.Canceled => io.recancel(),
+ }
+
+ // Now we expect to see `error.Canceled` again.
+ return dummy_event.wait(io);
+ }
+ };
+
+ const io = std.testing.io;
+ var future = io.concurrent(global.worker, .{io}) catch |err| switch (err) {
+ error.ConcurrencyUnavailable => return error.SkipZigTest,
+ };
+ if (future.cancel(io)) {
+ return error.UnexpectedSuccess; // both `wait` calls should have returned `error.Canceled`
+ } else |err| switch (err) {
+ error.Canceled => {},
+ }
+}
+
+test "swapCancelProtection" {
+ const global = struct {
+ fn waitTwice(
+ io: Io,
+ event: *Io.Event,
+ ) error{ Canceled, CanceledWhileProtected }!void {
+ // Wait for `event` while protected from cancelation.
+ {
+ const old_prot = io.swapCancelProtection(.blocked);
+ defer _ = io.swapCancelProtection(old_prot);
+ event.wait(io) catch |err| switch (err) {
+ error.Canceled => return error.CanceledWhileProtected,
+ };
+ }
+ // Reset the event (it will never be set again), and this time wait for it without protection.
+ event.reset();
+ _ = try event.wait(io);
+ }
+ fn sleepThenSet(io: Io, event: *Io.Event) !void {
+ // Give `waitTwice` a chance to get canceled.
+ try io.sleep(.fromMilliseconds(200), .awake);
+ event.set(io);
+ }
+ };
+
+ const io = std.testing.io;
+
+ var event: Io.Event = .unset;
+
+ var wait_future = io.concurrent(global.waitTwice, .{ io, &event }) catch |err| switch (err) {
+ error.ConcurrencyUnavailable => return error.SkipZigTest,
+ };
+ defer wait_future.cancel(io) catch {};
+
+ var set_future = try io.concurrent(global.sleepThenSet, .{ io, &event });
+ defer set_future.cancel(io) catch {};
+
+ if (wait_future.cancel(io)) {
+ return error.UnexpectedSuccess; // there was no `set` call to unblock the second `wait`
+ } else |err| switch (err) {
+ error.Canceled => {},
+ error.CanceledWhileProtected => |e| return e,
+ }
+
+ // Because it reached the `set`, it should be too late for `sleepThenSet` to see `error.Canceled`.
+ try set_future.cancel(io);
+}