diff options
| author | Jacob Young <jacobly0@users.noreply.github.com> | 2025-03-30 15:13:41 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2025-10-29 06:20:48 -0700 |
| commit | 08b609a79ff151e747c6b860c90b634daaa68f1c (patch) | |
| tree | 2ac9a3ac308bd53125ef1770e9002c8274f2e734 /lib/std/Thread | |
| parent | 5041c9ad9cbf479e62416cd06ef8a178f3467127 (diff) | |
| download | zig-08b609a79ff151e747c6b860c90b634daaa68f1c.tar.gz zig-08b609a79ff151e747c6b860c90b634daaa68f1c.zip | |
Io: implement sleep and fix cancel bugs
Diffstat (limited to 'lib/std/Thread')
| -rw-r--r-- | lib/std/Thread/Pool.zig | 108 |
1 files changed, 98 insertions, 10 deletions
diff --git a/lib/std/Thread/Pool.zig b/lib/std/Thread/Pool.zig index 0cddadf40d..6ec6c89040 100644 --- a/lib/std/Thread/Pool.zig +++ b/lib/std/Thread/Pool.zig @@ -332,13 +332,18 @@ pub fn io(pool: *Pool) Io { .vtable = &.{ .@"async" = @"async", .@"await" = @"await", + .cancel = cancel, .cancelRequested = cancelRequested, + .createFile = createFile, .openFile = openFile, .closeFile = closeFile, - .read = read, - .write = write, + .pread = pread, + .pwrite = pwrite, + + .now = now, + .sleep = sleep, }, }; } @@ -347,15 +352,44 @@ const AsyncClosure = struct { func: *const fn (context: *anyopaque, result: *anyopaque) void, runnable: Runnable = .{ .runFn = runFn }, reset_event: std.Thread.ResetEvent, - cancel_flag: bool, + cancel_tid: std.Thread.Id, context_offset: usize, result_offset: usize, + const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) { + .int => |int_info| switch (int_info.signedness) { + .signed => -1, + .unsigned => std.math.maxInt(std.Thread.Id), + }, + .pointer => @ptrFromInt(std.math.maxInt(usize)), + else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)), + }; + fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void { const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable)); + const tid = std.Thread.getCurrentId(); + if (@cmpxchgStrong( + std.Thread.Id, + &closure.cancel_tid, + 0, + tid, + .acq_rel, + .acquire, + )) |cancel_tid| { + assert(cancel_tid == canceling_tid); + return; + } current_closure = closure; closure.func(closure.contextPointer(), closure.resultPointer()); current_closure = null; + if (@cmpxchgStrong( + std.Thread.Id, + &closure.cancel_tid, + tid, + 0, + .acq_rel, + .acquire, + )) |cancel_tid| assert(cancel_tid == canceling_tid); closure.reset_event.set(); } @@ -414,7 +448,7 @@ fn @"async"( .context_offset = context_offset, .result_offset = result_offset, .reset_event = .{}, - .cancel_flag = false, + .cancel_tid = 0, }; @memcpy(closure.contextPointer()[0..context.len], context); pool.run_queue.prepend(&closure.runnable.node); @@ -456,7 +490,23 @@ fn cancel( _ = result_alignment; const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); - @atomicStore(bool, &closure.cancel_flag, true, .seq_cst); + switch (@atomicRmw( + std.Thread.Id, + &closure.cancel_tid, + .Xchg, + AsyncClosure.canceling_tid, + .acq_rel, + )) { + 0, AsyncClosure.canceling_tid => {}, + else => |cancel_tid| switch (builtin.os.tag) { + .linux => _ = std.os.linux.tgkill( + std.os.linux.getpid(), + @bitCast(cancel_tid), + std.posix.SIG.IO, + ), + else => {}, + }, + } closure.waitAndFree(pool.allocator, result); } @@ -464,7 +514,7 @@ fn cancelRequested(userdata: ?*anyopaque) bool { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); _ = pool; const closure = current_closure orelse return false; - return @atomicLoad(bool, &closure.cancel_flag, .unordered); + return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid; } fn checkCancel(pool: *Pool) error{AsyncCancel}!void { @@ -499,14 +549,52 @@ pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void { return file.close(); } -pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize { +pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); - return file.read(buffer); + return switch (offset) { + -1 => file.read(buffer), + else => file.pread(buffer, @bitCast(offset)), + }; } -pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize { +pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize { const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); try pool.checkCancel(); - return file.write(buffer); + return switch (offset) { + -1 => file.write(buffer), + else => file.pwrite(buffer, @bitCast(offset)), + }; +} + +pub fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + try pool.checkCancel(); + const timespec = try std.posix.clock_gettime(clockid); + return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec); +} + +pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void { + const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata)); + const deadline_nanoseconds: i96 = switch (deadline) { + .nanoseconds => |nanoseconds| nanoseconds, + .timestamp => |timestamp| @intFromEnum(timestamp), + }; + var timespec: std.posix.timespec = .{ + .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)), + .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)), + }; + while (true) { + try pool.checkCancel(); + switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clockid, .{ .ABSTIME = switch (deadline) { + .nanoseconds => false, + .timestamp => true, + } }, ×pec, ×pec))) { + .SUCCESS => return, + .FAULT => unreachable, + .INTR => {}, + .INVAL => return error.UnsupportedClock, + else => |err| return std.posix.unexpectedErrno(err), + } + } } |
