aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread
diff options
context:
space:
mode:
authorJacob Young <jacobly0@users.noreply.github.com>2025-03-30 15:13:41 -0400
committerAndrew Kelley <andrew@ziglang.org>2025-10-29 06:20:48 -0700
commit08b609a79ff151e747c6b860c90b634daaa68f1c (patch)
tree2ac9a3ac308bd53125ef1770e9002c8274f2e734 /lib/std/Thread
parent5041c9ad9cbf479e62416cd06ef8a178f3467127 (diff)
downloadzig-08b609a79ff151e747c6b860c90b634daaa68f1c.tar.gz
zig-08b609a79ff151e747c6b860c90b634daaa68f1c.zip
Io: implement sleep and fix cancel bugs
Diffstat (limited to 'lib/std/Thread')
-rw-r--r--lib/std/Thread/Pool.zig108
1 files changed, 98 insertions, 10 deletions
diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig
index 0cddadf40d..6ec6c89040 100644
--- a/lib/std/Thread/Pool.zig
+++ b/lib/std/Thread/Pool.zig
@@ -332,13 +332,18 @@ pub fn io(pool: *Pool) Io {
.vtable = &.{
.@"async" = @"async",
.@"await" = @"await",
+
.cancel = cancel,
.cancelRequested = cancelRequested,
+
.createFile = createFile,
.openFile = openFile,
.closeFile = closeFile,
- .read = read,
- .write = write,
+ .pread = pread,
+ .pwrite = pwrite,
+
+ .now = now,
+ .sleep = sleep,
},
};
}
@@ -347,15 +352,44 @@ const AsyncClosure = struct {
func: *const fn (context: *anyopaque, result: *anyopaque) void,
runnable: Runnable = .{ .runFn = runFn },
reset_event: std.Thread.ResetEvent,
- cancel_flag: bool,
+ cancel_tid: std.Thread.Id,
context_offset: usize,
result_offset: usize,
+ const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) {
+ .int => |int_info| switch (int_info.signedness) {
+ .signed => -1,
+ .unsigned => std.math.maxInt(std.Thread.Id),
+ },
+ .pointer => @ptrFromInt(std.math.maxInt(usize)),
+ else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
+ };
+
fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void {
const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable));
+ const tid = std.Thread.getCurrentId();
+ if (@cmpxchgStrong(
+ std.Thread.Id,
+ &closure.cancel_tid,
+ 0,
+ tid,
+ .acq_rel,
+ .acquire,
+ )) |cancel_tid| {
+ assert(cancel_tid == canceling_tid);
+ return;
+ }
current_closure = closure;
closure.func(closure.contextPointer(), closure.resultPointer());
current_closure = null;
+ if (@cmpxchgStrong(
+ std.Thread.Id,
+ &closure.cancel_tid,
+ tid,
+ 0,
+ .acq_rel,
+ .acquire,
+ )) |cancel_tid| assert(cancel_tid == canceling_tid);
closure.reset_event.set();
}
@@ -414,7 +448,7 @@ fn @"async"(
.context_offset = context_offset,
.result_offset = result_offset,
.reset_event = .{},
- .cancel_flag = false,
+ .cancel_tid = 0,
};
@memcpy(closure.contextPointer()[0..context.len], context);
pool.run_queue.prepend(&closure.runnable.node);
@@ -456,7 +490,23 @@ fn cancel(
_ = result_alignment;
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
- @atomicStore(bool, &closure.cancel_flag, true, .seq_cst);
+ switch (@atomicRmw(
+ std.Thread.Id,
+ &closure.cancel_tid,
+ .Xchg,
+ AsyncClosure.canceling_tid,
+ .acq_rel,
+ )) {
+ 0, AsyncClosure.canceling_tid => {},
+ else => |cancel_tid| switch (builtin.os.tag) {
+ .linux => _ = std.os.linux.tgkill(
+ std.os.linux.getpid(),
+ @bitCast(cancel_tid),
+ std.posix.SIG.IO,
+ ),
+ else => {},
+ },
+ }
closure.waitAndFree(pool.allocator, result);
}
@@ -464,7 +514,7 @@ fn cancelRequested(userdata: ?*anyopaque) bool {
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
_ = pool;
const closure = current_closure orelse return false;
- return @atomicLoad(bool, &closure.cancel_flag, .unordered);
+ return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid;
}
fn checkCancel(pool: *Pool) error{AsyncCancel}!void {
@@ -499,14 +549,52 @@ pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
return file.close();
}
-pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize {
+pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize {
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
try pool.checkCancel();
- return file.read(buffer);
+ return switch (offset) {
+ -1 => file.read(buffer),
+ else => file.pread(buffer, @bitCast(offset)),
+ };
}
-pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize {
+pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize {
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
try pool.checkCancel();
- return file.write(buffer);
+ return switch (offset) {
+ -1 => file.write(buffer),
+ else => file.pwrite(buffer, @bitCast(offset)),
+ };
+}
+
+pub fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
+ const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+ try pool.checkCancel();
+ const timespec = try std.posix.clock_gettime(clockid);
+ return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
+}
+
+pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
+ const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+ const deadline_nanoseconds: i96 = switch (deadline) {
+ .nanoseconds => |nanoseconds| nanoseconds,
+ .timestamp => |timestamp| @intFromEnum(timestamp),
+ };
+ var timespec: std.posix.timespec = .{
+ .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
+ .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)),
+ };
+ while (true) {
+ try pool.checkCancel();
+ switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clockid, .{ .ABSTIME = switch (deadline) {
+ .nanoseconds => false,
+ .timestamp => true,
+ } }, &timespec, &timespec))) {
+ .SUCCESS => return,
+ .FAULT => unreachable,
+ .INTR => {},
+ .INVAL => return error.UnsupportedClock,
+ else => |err| return std.posix.unexpectedErrno(err),
+ }
+ }
}