diff options
Diffstat (limited to 'lib/std/Progress.zig')
| -rw-r--r-- | lib/std/Progress.zig | 288 |
1 files changed, 131 insertions, 157 deletions
diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index d1ab503661..6baa24d246 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -1,26 +1,30 @@ //! This API is non-allocating, non-fallible, thread-safe, and lock-free. +const Progress = @This(); -const std = @import("std"); const builtin = @import("builtin"); +const is_big_endian = builtin.cpu.arch.endian() == .big; +const is_windows = builtin.os.tag == .windows; + +const std = @import("std"); +const Io = std.Io; const windows = std.os.windows; const testing = std.testing; const assert = std.debug.assert; -const Progress = @This(); const posix = std.posix; -const is_big_endian = builtin.cpu.arch.endian() == .big; -const is_windows = builtin.os.tag == .windows; const Writer = std.Io.Writer; -/// `null` if the current node (and its children) should -/// not print on update() -terminal: std.fs.File, +/// Currently this API only supports this value being set to stderr, which +/// happens automatically inside `start`. +terminal: Io.File, + +io: Io, terminal_mode: TerminalMode, -update_thread: ?std.Thread, +update_worker: ?Io.Future(void), /// Atomically set by SIGWINCH as well as the root done() function. -redraw_event: std.Thread.ResetEvent, +redraw_event: Io.Event, /// Indicates a request to shut down and reset global state. /// Accessed atomically. done: bool, @@ -48,6 +52,8 @@ node_freelist: Freelist, /// value may at times temporarily exceed the node count. node_end_index: u32, +start_failure: StartFailure, + pub const Status = enum { /// Indicates the application is progressing towards completion of a task. /// Unless the application is interactive, this is the only status the @@ -93,9 +99,9 @@ pub const Options = struct { /// Must be at least 200 bytes. draw_buffer: []u8 = &default_draw_buffer, /// How many nanoseconds between writing updates to the terminal. - refresh_rate_ns: u64 = 80 * std.time.ns_per_ms, + refresh_rate_ns: Io.Duration = .fromMilliseconds(80), /// How many nanoseconds to keep the output hidden - initial_delay_ns: u64 = 200 * std.time.ns_per_ms, + initial_delay_ns: Io.Duration = .fromMilliseconds(200), /// If provided, causes the progress item to have a denominator. /// 0 means unknown. estimated_total_items: usize = 0, @@ -121,20 +127,20 @@ pub const Node = struct { name: [max_name_len]u8 align(@alignOf(usize)), /// Not thread-safe. - fn getIpcFd(s: Storage) ?posix.fd_t { - return if (s.estimated_total_count == std.math.maxInt(u32)) switch (@typeInfo(posix.fd_t)) { + fn getIpcFd(s: Storage) ?Io.File.Handle { + return if (s.estimated_total_count == std.math.maxInt(u32)) switch (@typeInfo(Io.File.Handle)) { .int => @bitCast(s.completed_count), .pointer => @ptrFromInt(s.completed_count), - else => @compileError("unsupported fd_t of " ++ @typeName(posix.fd_t)), + else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)), } else null; } /// Thread-safe. - fn setIpcFd(s: *Storage, fd: posix.fd_t) void { - const integer: u32 = switch (@typeInfo(posix.fd_t)) { + fn setIpcFd(s: *Storage, fd: Io.File.Handle) void { + const integer: u32 = switch (@typeInfo(Io.File.Handle)) { .int => @bitCast(fd), .pointer => @intFromPtr(fd), - else => @compileError("unsupported fd_t of " ++ @typeName(posix.fd_t)), + else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)), }; // `estimated_total_count` max int indicates the special state that // causes `completed_count` to be treated as a file descriptor, so @@ -327,13 +333,14 @@ pub const Node = struct { } } else { @atomicStore(bool, &global_progress.done, true, .monotonic); - global_progress.redraw_event.set(); - if (global_progress.update_thread) |thread| thread.join(); + const io = global_progress.io; + global_progress.redraw_event.set(io); + if (global_progress.update_worker) |*worker| worker.await(io); } } /// Posix-only. Used by `std.process.Child`. Thread-safe. - pub fn setIpcFd(node: Node, fd: posix.fd_t) void { + pub fn setIpcFd(node: Node, fd: Io.File.Handle) void { const index = node.index.unwrap() orelse return; assert(fd >= 0); assert(fd != posix.STDOUT_FILENO); @@ -344,14 +351,14 @@ pub const Node = struct { /// Posix-only. Thread-safe. Assumes the node is storing an IPC file /// descriptor. - pub fn getIpcFd(node: Node) ?posix.fd_t { + pub fn getIpcFd(node: Node) ?Io.File.Handle { const index = node.index.unwrap() orelse return null; const storage = storageByIndex(index); const int = @atomicLoad(u32, &storage.completed_count, .monotonic); - return switch (@typeInfo(posix.fd_t)) { + return switch (@typeInfo(Io.File.Handle)) { .int => @bitCast(int), .pointer => @ptrFromInt(int), - else => @compileError("unsupported fd_t of " ++ @typeName(posix.fd_t)), + else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)), }; } @@ -389,9 +396,10 @@ pub const Node = struct { }; var global_progress: Progress = .{ + .io = undefined, .terminal = undefined, .terminal_mode = .off, - .update_thread = null, + .update_worker = null, .redraw_event = .unset, .refresh_rate_ns = undefined, .initial_delay_ns = undefined, @@ -401,6 +409,7 @@ var global_progress: Progress = .{ .done = false, .need_clear = false, .status = .working, + .start_failure = .unstarted, .node_parents = &node_parents_buffer, .node_storage = &node_storage_buffer, @@ -409,6 +418,13 @@ var global_progress: Progress = .{ .node_end_index = 0, }; +pub const StartFailure = union(enum) { + unstarted, + spawn_ipc_worker: error{ConcurrencyUnavailable}, + spawn_update_worker: error{ConcurrencyUnavailable}, + parse_env_var: error{ InvalidCharacter, Overflow }, +}; + const node_storage_buffer_len = 83; var node_parents_buffer: [node_storage_buffer_len]Node.Parent = undefined; var node_storage_buffer: [node_storage_buffer_len]Node.Storage = undefined; @@ -435,7 +451,9 @@ const noop_impl = builtin.single_threaded or switch (builtin.os.tag) { /// Asserts there is only one global Progress instance. /// /// Call `Node.end` when done. -pub fn start(options: Options) Node { +/// +/// If an error occurs, `start_failure` will be populated. +pub fn start(io: Io, options: Options) Node { // Ensure there is only 1 global Progress object. if (global_progress.node_end_index != 0) { debug_start_trace.dump(); @@ -450,21 +468,24 @@ pub fn start(options: Options) Node { assert(options.draw_buffer.len >= 200); global_progress.draw_buffer = options.draw_buffer; - global_progress.refresh_rate_ns = options.refresh_rate_ns; - global_progress.initial_delay_ns = options.initial_delay_ns; + global_progress.refresh_rate_ns = @intCast(options.refresh_rate_ns.toNanoseconds()); + global_progress.initial_delay_ns = @intCast(options.initial_delay_ns.toNanoseconds()); if (noop_impl) return Node.none; + global_progress.io = io; + if (std.process.parseEnvVarInt("ZIG_PROGRESS", u31, 10)) |ipc_fd| { - global_progress.update_thread = std.Thread.spawn(.{}, ipcThreadRun, .{ - @as(posix.fd_t, switch (@typeInfo(posix.fd_t)) { + global_progress.update_worker = io.concurrent(ipcThreadRun, .{ + io, + @as(Io.File, .{ .handle = switch (@typeInfo(Io.File.Handle)) { .int => ipc_fd, .pointer => @ptrFromInt(ipc_fd), - else => @compileError("unsupported fd_t of " ++ @typeName(posix.fd_t)), - }), + else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)), + } }), }) catch |err| { - std.log.warn("failed to spawn IPC thread for communicating progress to parent: {s}", .{@errorName(err)}); + global_progress.start_failure = .{ .spawn_ipc_worker = err }; return Node.none; }; } else |env_err| switch (env_err) { @@ -472,14 +493,21 @@ pub fn start(options: Options) Node { if (options.disable_printing) { return Node.none; } - const stderr: std.fs.File = .stderr(); + const stderr: Io.File = .stderr(); global_progress.terminal = stderr; - if (stderr.getOrEnableAnsiEscapeSupport()) { + if (stderr.enableAnsiEscapeCodes(io)) |_| { global_progress.terminal_mode = .ansi_escape_codes; - } else if (is_windows and stderr.isTty()) { - global_progress.terminal_mode = TerminalMode{ .windows_api = .{ - .code_page = windows.kernel32.GetConsoleOutputCP(), - } }; + } else |_| if (is_windows) { + if (stderr.isTty(io)) |is_tty| { + if (is_tty) global_progress.terminal_mode = TerminalMode{ .windows_api = .{ + .code_page = windows.kernel32.GetConsoleOutputCP(), + } }; + } else |err| switch (err) { + error.Canceled => { + io.recancel(); + return Node.none; + }, + } } if (global_progress.terminal_mode == .off) { @@ -497,17 +525,17 @@ pub fn start(options: Options) Node { if (switch (global_progress.terminal_mode) { .off => unreachable, // handled a few lines above - .ansi_escape_codes => std.Thread.spawn(.{}, updateThreadRun, .{}), - .windows_api => if (is_windows) std.Thread.spawn(.{}, windowsApiUpdateThreadRun, .{}) else unreachable, - }) |thread| { - global_progress.update_thread = thread; + .ansi_escape_codes => io.concurrent(updateThreadRun, .{io}), + .windows_api => if (is_windows) io.concurrent(windowsApiUpdateThreadRun, .{io}) else unreachable, + }) |future| { + global_progress.update_worker = future; } else |err| { - std.log.warn("unable to spawn thread for printing progress to terminal: {s}", .{@errorName(err)}); + global_progress.start_failure = .{ .spawn_update_worker = err }; return Node.none; } }, else => |e| { - std.log.warn("invalid ZIG_PROGRESS file descriptor integer: {s}", .{@errorName(e)}); + global_progress.start_failure = .{ .parse_env_var = e }; return Node.none; }, } @@ -521,48 +549,52 @@ pub fn setStatus(new_status: Status) void { } /// Returns whether a resize is needed to learn the terminal size. -fn wait(timeout_ns: u64) bool { - const resize_flag = if (global_progress.redraw_event.timedWait(timeout_ns)) |_| true else |err| switch (err) { - error.Timeout => false, +fn wait(io: Io, timeout_ns: u64) bool { + const timeout: Io.Timeout = .{ .duration = .{ + .clock = .awake, + .raw = .fromNanoseconds(timeout_ns), + } }; + const resize_flag = if (global_progress.redraw_event.waitTimeout(io, timeout)) |_| true else |err| switch (err) { + error.Timeout, error.Canceled => false, }; global_progress.redraw_event.reset(); return resize_flag or (global_progress.cols == 0); } -fn updateThreadRun() void { +fn updateThreadRun(io: Io) void { // Store this data in the thread so that it does not need to be part of the // linker data of the main executable. var serialized_buffer: Serialized.Buffer = undefined; { - const resize_flag = wait(global_progress.initial_delay_ns); + const resize_flag = wait(io, global_progress.initial_delay_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) return; maybeUpdateSize(resize_flag); const buffer, _ = computeRedraw(&serialized_buffer); - if (stderr_mutex.tryLock()) { - defer stderr_mutex.unlock(); - write(buffer) catch return; + if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| { + defer io.unlockStderr(); global_progress.need_clear = true; + locked_stderr.file_writer.interface.writeAll(buffer) catch return; } } while (true) { - const resize_flag = wait(global_progress.refresh_rate_ns); + const resize_flag = wait(io, global_progress.refresh_rate_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) { - stderr_mutex.lock(); - defer stderr_mutex.unlock(); - return clearWrittenWithEscapeCodes() catch {}; + const stderr = io.lockStderr(&.{}, null) catch return; + defer io.unlockStderr(); + return clearWrittenWithEscapeCodes(stderr.file_writer) catch {}; } maybeUpdateSize(resize_flag); const buffer, _ = computeRedraw(&serialized_buffer); - if (stderr_mutex.tryLock()) { - defer stderr_mutex.unlock(); - write(buffer) catch return; + if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| { + defer io.unlockStderr(); global_progress.need_clear = true; + locked_stderr.file_writer.interface.writeAll(buffer) catch return; } } } @@ -575,117 +607,72 @@ fn windowsApiWriteMarker() void { _ = windows.kernel32.WriteConsoleW(handle, &[_]u16{windows_api_start_marker}, 1, &num_chars_written, null); } -fn windowsApiUpdateThreadRun() void { +fn windowsApiUpdateThreadRun(io: Io) void { var serialized_buffer: Serialized.Buffer = undefined; { - const resize_flag = wait(global_progress.initial_delay_ns); + const resize_flag = wait(io, global_progress.initial_delay_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) return; maybeUpdateSize(resize_flag); const buffer, const nl_n = computeRedraw(&serialized_buffer); - if (stderr_mutex.tryLock()) { - defer stderr_mutex.unlock(); + if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| { + defer io.unlockStderr(); windowsApiWriteMarker(); - write(buffer) catch return; global_progress.need_clear = true; + locked_stderr.file_writer.interface.writeAll(buffer) catch return; windowsApiMoveToMarker(nl_n) catch return; } } while (true) { - const resize_flag = wait(global_progress.refresh_rate_ns); + const resize_flag = wait(io, global_progress.refresh_rate_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) { - stderr_mutex.lock(); - defer stderr_mutex.unlock(); + _ = io.lockStderr(&.{}, null) catch return; + defer io.unlockStderr(); return clearWrittenWindowsApi() catch {}; } maybeUpdateSize(resize_flag); const buffer, const nl_n = computeRedraw(&serialized_buffer); - if (stderr_mutex.tryLock()) { - defer stderr_mutex.unlock(); + if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| { + defer io.unlockStderr(); clearWrittenWindowsApi() catch return; windowsApiWriteMarker(); - write(buffer) catch return; global_progress.need_clear = true; + locked_stderr.file_writer.interface.writeAll(buffer) catch return; windowsApiMoveToMarker(nl_n) catch return; } } } -/// Allows the caller to freely write to stderr until `unlockStdErr` is called. -/// -/// During the lock, any `std.Progress` information is cleared from the terminal. -/// -/// The lock is recursive; the same thread may hold the lock multiple times. -pub fn lockStdErr() void { - stderr_mutex.lock(); - clearWrittenWithEscapeCodes() catch {}; -} - -pub fn unlockStdErr() void { - stderr_mutex.unlock(); -} - -/// Protected by `stderr_mutex`. -const stderr_writer: *Writer = &stderr_file_writer.interface; -/// Protected by `stderr_mutex`. -var stderr_file_writer: std.fs.File.Writer = .{ - .interface = std.fs.File.Writer.initInterface(&.{}), - .file = if (is_windows) undefined else .stderr(), - .mode = .streaming, -}; - -/// Allows the caller to freely write to the returned `Writer`, -/// initialized with `buffer`, until `unlockStderrWriter` is called. -/// -/// During the lock, any `std.Progress` information is cleared from the terminal. -/// -/// The lock is recursive; the same thread may hold the lock multiple times. -pub fn lockStderrWriter(buffer: []u8) *Writer { - stderr_mutex.lock(); - clearWrittenWithEscapeCodes() catch {}; - if (is_windows) stderr_file_writer.file = .stderr(); - stderr_writer.flush() catch {}; - stderr_writer.buffer = buffer; - return stderr_writer; -} - -pub fn unlockStderrWriter() void { - stderr_writer.flush() catch {}; - stderr_writer.end = 0; - stderr_writer.buffer = &.{}; - stderr_mutex.unlock(); -} - -fn ipcThreadRun(fd: posix.fd_t) anyerror!void { +fn ipcThreadRun(io: Io, file: Io.File) void { // Store this data in the thread so that it does not need to be part of the // linker data of the main executable. var serialized_buffer: Serialized.Buffer = undefined; { - _ = wait(global_progress.initial_delay_ns); + _ = wait(io, global_progress.initial_delay_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) return; const serialized = serialize(&serialized_buffer); - writeIpc(fd, serialized) catch |err| switch (err) { + writeIpc(io, file, serialized) catch |err| switch (err) { error.BrokenPipe => return, }; } while (true) { - _ = wait(global_progress.refresh_rate_ns); + _ = wait(io, global_progress.refresh_rate_ns); if (@atomicLoad(bool, &global_progress.done, .monotonic)) return; const serialized = serialize(&serialized_buffer); - writeIpc(fd, serialized) catch |err| switch (err) { + writeIpc(io, file, serialized) catch |err| switch (err) { error.BrokenPipe => return, }; } @@ -784,11 +771,10 @@ fn appendTreeSymbol(symbol: TreeSymbol, buf: []u8, start_i: usize) usize { } } -fn clearWrittenWithEscapeCodes() anyerror!void { +pub fn clearWrittenWithEscapeCodes(file_writer: *Io.File.Writer) Io.Writer.Error!void { if (noop_impl or !global_progress.need_clear) return; - + try file_writer.interface.writeAll(clear ++ progress_remove); global_progress.need_clear = false; - try write(clear ++ progress_remove); } /// U+25BA or ► @@ -948,11 +934,11 @@ const SavedMetadata = struct { const Fd = enum(i32) { _, - fn init(fd: posix.fd_t) Fd { + fn init(fd: Io.File.Handle) Fd { return @enumFromInt(if (is_windows) @as(isize, @bitCast(@intFromPtr(fd))) else fd); } - fn get(fd: Fd) posix.fd_t { + fn get(fd: Fd) Io.File.Handle { return if (is_windows) @ptrFromInt(@as(usize, @bitCast(@as(isize, @intFromEnum(fd))))) else @@ -963,6 +949,7 @@ const Fd = enum(i32) { var ipc_metadata_len: u8 = 0; fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buffer) usize { + const io = global_progress.io; const ipc_metadata_fds_copy = &serialized_buffer.ipc_metadata_fds_copy; const ipc_metadata_copy = &serialized_buffer.ipc_metadata_copy; const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds; @@ -981,14 +968,14 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff 0.., ) |main_parent, *main_storage, main_index| { if (main_parent == .unused) continue; - const fd = main_storage.getIpcFd() orelse continue; - const opt_saved_metadata = findOld(fd, old_ipc_metadata_fds, old_ipc_metadata); + const file: Io.File = .{ .handle = main_storage.getIpcFd() orelse continue }; + const opt_saved_metadata = findOld(file.handle, old_ipc_metadata_fds, old_ipc_metadata); var bytes_read: usize = 0; while (true) { - const n = posix.read(fd, pipe_buf[bytes_read..]) catch |err| switch (err) { + const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) { error.WouldBlock => break, else => |e| { - std.log.debug("failed to read child progress data: {s}", .{@errorName(e)}); + std.log.debug("failed to read child progress data: {t}", .{e}); main_storage.completed_count = 0; main_storage.estimated_total_count = 0; continue :main_loop; @@ -1014,7 +1001,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff // Ignore all but the last message on the pipe. var input: []u8 = pipe_buf[0..bytes_read]; if (input.len == 0) { - serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, 0, fd); + serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, 0, file.handle); continue; } @@ -1024,7 +1011,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff if (input.len < expected_bytes) { // Ignore short reads. We'll handle the next full message when it comes instead. const remaining_read_trash_bytes: u16 = @intCast(expected_bytes - input.len); - serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, remaining_read_trash_bytes, fd); + serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, remaining_read_trash_bytes, file.handle); continue :main_loop; } if (input.len > expected_bytes) { @@ -1042,7 +1029,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff const nodes_len: u8 = @intCast(@min(parents.len - 1, serialized_buffer.storage.len - serialized_len)); // Remember in case the pipe is empty on next update. - ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd); + ipc_metadata_fds[ipc_metadata_len] = Fd.init(file.handle); ipc_metadata[ipc_metadata_len] = .{ .remaining_read_trash_bytes = 0, .start_index = @intCast(serialized_len), @@ -1100,7 +1087,7 @@ fn copyRoot(dest: *Node.Storage, src: *align(1) Node.Storage) void { } fn findOld( - ipc_fd: posix.fd_t, + ipc_fd: Io.File.Handle, old_metadata_fds: []Fd, old_metadata: []SavedMetadata, ) ?*SavedMetadata { @@ -1118,7 +1105,7 @@ fn useSavedIpcData( main_index: usize, opt_saved_metadata: ?*SavedMetadata, remaining_read_trash_bytes: u16, - fd: posix.fd_t, + fd: Io.File.Handle, ) usize { const parents_copy = &serialized_buffer.parents_copy; const storage_copy = &serialized_buffer.storage_copy; @@ -1415,13 +1402,9 @@ fn withinRowLimit(p: *Progress, nl_n: usize) bool { return nl_n + 2 < p.rows; } -fn write(buf: []const u8) anyerror!void { - try global_progress.terminal.writeAll(buf); -} - var remaining_write_trash_bytes: usize = 0; -fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void { +fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!void { // Byteswap if necessary to ensure little endian over the pipe. This is // needed because the parent or child process might be running in qemu. if (is_big_endian) for (serialized.storage) |*s| s.byteSwap(); @@ -1432,11 +1415,7 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void { const storage = std.mem.sliceAsBytes(serialized.storage); const parents = std.mem.sliceAsBytes(serialized.parents); - var vecs: [3]posix.iovec_const = .{ - .{ .base = header.ptr, .len = header.len }, - .{ .base = storage.ptr, .len = storage.len }, - .{ .base = parents.ptr, .len = parents.len }, - }; + var vecs: [3][]const u8 = .{ header, storage, parents }; // Ensures the packet can fit in the pipe buffer. const upper_bound_msg_len = 1 + node_storage_buffer_len * @sizeOf(Node.Storage) + @@ -1447,14 +1426,14 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void { // We do this in a separate write call to give a better chance for the // writev below to be in a single packet. const n = @min(parents.len, remaining_write_trash_bytes); - if (posix.write(fd, parents[0..n])) |written| { + if (io.vtable.fileWriteStreaming(io.userdata, file, &.{}, &.{parents[0..n]}, 1)) |written| { remaining_write_trash_bytes -= written; continue; } else |err| switch (err) { error.WouldBlock => return, error.BrokenPipe => return error.BrokenPipe, else => |e| { - std.log.debug("failed to send progress to parent process: {s}", .{@errorName(e)}); + std.log.debug("failed to send progress to parent process: {t}", .{e}); return error.BrokenPipe; }, } @@ -1462,7 +1441,7 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void { // If this write would block we do not want to keep trying, but we need to // know if a partial message was written. - if (writevNonblock(fd, &vecs)) |written| { + if (writevNonblock(io, file, &vecs)) |written| { const total = header.len + storage.len + parents.len; if (written < total) { remaining_write_trash_bytes = total - written; @@ -1471,13 +1450,13 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void { error.WouldBlock => {}, error.BrokenPipe => return error.BrokenPipe, else => |e| { - std.log.debug("failed to send progress to parent process: {s}", .{@errorName(e)}); + std.log.debug("failed to send progress to parent process: {t}", .{e}); return error.BrokenPipe; }, } } -fn writevNonblock(fd: posix.fd_t, iov: []posix.iovec_const) posix.WriteError!usize { +fn writevNonblock(io: Io, file: Io.File, iov: [][]const u8) Io.File.Writer.Error!usize { var iov_index: usize = 0; var written: usize = 0; var total_written: usize = 0; @@ -1486,9 +1465,9 @@ fn writevNonblock(fd: posix.fd_t, iov: []posix.iovec_const) posix.WriteError!usi written >= iov[iov_index].len else return total_written) : (iov_index += 1) written -= iov[iov_index].len; - iov[iov_index].base += written; + iov[iov_index].ptr += written; iov[iov_index].len -= written; - written = try posix.writev(fd, iov[iov_index..]); + written = try io.vtable.fileWriteStreaming(io.userdata, file, &.{}, iov, 1); if (written == 0) return total_written; total_written += written; } @@ -1538,7 +1517,7 @@ fn handleSigWinch(sig: posix.SIG, info: *const posix.siginfo_t, ctx_ptr: ?*anyop _ = info; _ = ctx_ptr; assert(sig == .WINCH); - global_progress.redraw_event.set(); + global_progress.redraw_event.set(global_progress.io); } const have_sigwinch = switch (builtin.os.tag) { @@ -1563,11 +1542,6 @@ const have_sigwinch = switch (builtin.os.tag) { else => false, }; -/// The primary motivation for recursive mutex here is so that a panic while -/// stderr mutex is held still dumps the stack trace and other debug -/// information. -var stderr_mutex = std.Thread.Mutex.Recursive.init; - fn copyAtomicStore(dest: []align(@alignOf(usize)) u8, src: []const u8) void { assert(dest.len == src.len); const chunked_len = dest.len / @sizeOf(usize); |
