aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Progress.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrewrk@noreply.codeberg.org>2025-12-27 14:10:46 +0100
committerAndrew Kelley <andrewrk@noreply.codeberg.org>2025-12-27 14:10:46 +0100
commite55e6b5528bb2f01de242fcf32b172e244e98e74 (patch)
tree3a5eb3193d3d192c54ab0c2b7295a7f21861c27e /lib/std/Progress.zig
parentc3f2de5e519926eb0029062fe8e782a6f9df9c05 (diff)
parent60a1ba0a8f3517356fa2941462f002a7f580545b (diff)
downloadzig-e55e6b5528bb2f01de242fcf32b172e244e98e74.tar.gz
zig-e55e6b5528bb2f01de242fcf32b172e244e98e74.zip
Merge pull request 'std: migrate all `fs` APIs to `Io`' (#30232) from std.Io-fs into master
Reviewed-on: https://codeberg.org/ziglang/zig/pulls/30232
Diffstat (limited to 'lib/std/Progress.zig')
-rw-r--r--lib/std/Progress.zig288
1 files changed, 131 insertions, 157 deletions
diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig
index d1ab503661..6baa24d246 100644
--- a/lib/std/Progress.zig
+++ b/lib/std/Progress.zig
@@ -1,26 +1,30 @@
//! This API is non-allocating, non-fallible, thread-safe, and lock-free.
+const Progress = @This();
-const std = @import("std");
const builtin = @import("builtin");
+const is_big_endian = builtin.cpu.arch.endian() == .big;
+const is_windows = builtin.os.tag == .windows;
+
+const std = @import("std");
+const Io = std.Io;
const windows = std.os.windows;
const testing = std.testing;
const assert = std.debug.assert;
-const Progress = @This();
const posix = std.posix;
-const is_big_endian = builtin.cpu.arch.endian() == .big;
-const is_windows = builtin.os.tag == .windows;
const Writer = std.Io.Writer;
-/// `null` if the current node (and its children) should
-/// not print on update()
-terminal: std.fs.File,
+/// Currently this API only supports this value being set to stderr, which
+/// happens automatically inside `start`.
+terminal: Io.File,
+
+io: Io,
terminal_mode: TerminalMode,
-update_thread: ?std.Thread,
+update_worker: ?Io.Future(void),
/// Atomically set by SIGWINCH as well as the root done() function.
-redraw_event: std.Thread.ResetEvent,
+redraw_event: Io.Event,
/// Indicates a request to shut down and reset global state.
/// Accessed atomically.
done: bool,
@@ -48,6 +52,8 @@ node_freelist: Freelist,
/// value may at times temporarily exceed the node count.
node_end_index: u32,
+start_failure: StartFailure,
+
pub const Status = enum {
/// Indicates the application is progressing towards completion of a task.
/// Unless the application is interactive, this is the only status the
@@ -93,9 +99,9 @@ pub const Options = struct {
/// Must be at least 200 bytes.
draw_buffer: []u8 = &default_draw_buffer,
/// How many nanoseconds between writing updates to the terminal.
- refresh_rate_ns: u64 = 80 * std.time.ns_per_ms,
+ refresh_rate_ns: Io.Duration = .fromMilliseconds(80),
/// How many nanoseconds to keep the output hidden
- initial_delay_ns: u64 = 200 * std.time.ns_per_ms,
+ initial_delay_ns: Io.Duration = .fromMilliseconds(200),
/// If provided, causes the progress item to have a denominator.
/// 0 means unknown.
estimated_total_items: usize = 0,
@@ -121,20 +127,20 @@ pub const Node = struct {
name: [max_name_len]u8 align(@alignOf(usize)),
/// Not thread-safe.
- fn getIpcFd(s: Storage) ?posix.fd_t {
- return if (s.estimated_total_count == std.math.maxInt(u32)) switch (@typeInfo(posix.fd_t)) {
+ fn getIpcFd(s: Storage) ?Io.File.Handle {
+ return if (s.estimated_total_count == std.math.maxInt(u32)) switch (@typeInfo(Io.File.Handle)) {
.int => @bitCast(s.completed_count),
.pointer => @ptrFromInt(s.completed_count),
- else => @compileError("unsupported fd_t of " ++ @typeName(posix.fd_t)),
+ else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)),
} else null;
}
/// Thread-safe.
- fn setIpcFd(s: *Storage, fd: posix.fd_t) void {
- const integer: u32 = switch (@typeInfo(posix.fd_t)) {
+ fn setIpcFd(s: *Storage, fd: Io.File.Handle) void {
+ const integer: u32 = switch (@typeInfo(Io.File.Handle)) {
.int => @bitCast(fd),
.pointer => @intFromPtr(fd),
- else => @compileError("unsupported fd_t of " ++ @typeName(posix.fd_t)),
+ else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)),
};
// `estimated_total_count` max int indicates the special state that
// causes `completed_count` to be treated as a file descriptor, so
@@ -327,13 +333,14 @@ pub const Node = struct {
}
} else {
@atomicStore(bool, &global_progress.done, true, .monotonic);
- global_progress.redraw_event.set();
- if (global_progress.update_thread) |thread| thread.join();
+ const io = global_progress.io;
+ global_progress.redraw_event.set(io);
+ if (global_progress.update_worker) |*worker| worker.await(io);
}
}
/// Posix-only. Used by `std.process.Child`. Thread-safe.
- pub fn setIpcFd(node: Node, fd: posix.fd_t) void {
+ pub fn setIpcFd(node: Node, fd: Io.File.Handle) void {
const index = node.index.unwrap() orelse return;
assert(fd >= 0);
assert(fd != posix.STDOUT_FILENO);
@@ -344,14 +351,14 @@ pub const Node = struct {
/// Posix-only. Thread-safe. Assumes the node is storing an IPC file
/// descriptor.
- pub fn getIpcFd(node: Node) ?posix.fd_t {
+ pub fn getIpcFd(node: Node) ?Io.File.Handle {
const index = node.index.unwrap() orelse return null;
const storage = storageByIndex(index);
const int = @atomicLoad(u32, &storage.completed_count, .monotonic);
- return switch (@typeInfo(posix.fd_t)) {
+ return switch (@typeInfo(Io.File.Handle)) {
.int => @bitCast(int),
.pointer => @ptrFromInt(int),
- else => @compileError("unsupported fd_t of " ++ @typeName(posix.fd_t)),
+ else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)),
};
}
@@ -389,9 +396,10 @@ pub const Node = struct {
};
var global_progress: Progress = .{
+ .io = undefined,
.terminal = undefined,
.terminal_mode = .off,
- .update_thread = null,
+ .update_worker = null,
.redraw_event = .unset,
.refresh_rate_ns = undefined,
.initial_delay_ns = undefined,
@@ -401,6 +409,7 @@ var global_progress: Progress = .{
.done = false,
.need_clear = false,
.status = .working,
+ .start_failure = .unstarted,
.node_parents = &node_parents_buffer,
.node_storage = &node_storage_buffer,
@@ -409,6 +418,13 @@ var global_progress: Progress = .{
.node_end_index = 0,
};
+pub const StartFailure = union(enum) {
+ unstarted,
+ spawn_ipc_worker: error{ConcurrencyUnavailable},
+ spawn_update_worker: error{ConcurrencyUnavailable},
+ parse_env_var: error{ InvalidCharacter, Overflow },
+};
+
const node_storage_buffer_len = 83;
var node_parents_buffer: [node_storage_buffer_len]Node.Parent = undefined;
var node_storage_buffer: [node_storage_buffer_len]Node.Storage = undefined;
@@ -435,7 +451,9 @@ const noop_impl = builtin.single_threaded or switch (builtin.os.tag) {
/// Asserts there is only one global Progress instance.
///
/// Call `Node.end` when done.
-pub fn start(options: Options) Node {
+///
+/// If an error occurs, `start_failure` will be populated.
+pub fn start(io: Io, options: Options) Node {
// Ensure there is only 1 global Progress object.
if (global_progress.node_end_index != 0) {
debug_start_trace.dump();
@@ -450,21 +468,24 @@ pub fn start(options: Options) Node {
assert(options.draw_buffer.len >= 200);
global_progress.draw_buffer = options.draw_buffer;
- global_progress.refresh_rate_ns = options.refresh_rate_ns;
- global_progress.initial_delay_ns = options.initial_delay_ns;
+ global_progress.refresh_rate_ns = @intCast(options.refresh_rate_ns.toNanoseconds());
+ global_progress.initial_delay_ns = @intCast(options.initial_delay_ns.toNanoseconds());
if (noop_impl)
return Node.none;
+ global_progress.io = io;
+
if (std.process.parseEnvVarInt("ZIG_PROGRESS", u31, 10)) |ipc_fd| {
- global_progress.update_thread = std.Thread.spawn(.{}, ipcThreadRun, .{
- @as(posix.fd_t, switch (@typeInfo(posix.fd_t)) {
+ global_progress.update_worker = io.concurrent(ipcThreadRun, .{
+ io,
+ @as(Io.File, .{ .handle = switch (@typeInfo(Io.File.Handle)) {
.int => ipc_fd,
.pointer => @ptrFromInt(ipc_fd),
- else => @compileError("unsupported fd_t of " ++ @typeName(posix.fd_t)),
- }),
+ else => @compileError("unsupported fd_t of " ++ @typeName(Io.File.Handle)),
+ } }),
}) catch |err| {
- std.log.warn("failed to spawn IPC thread for communicating progress to parent: {s}", .{@errorName(err)});
+ global_progress.start_failure = .{ .spawn_ipc_worker = err };
return Node.none;
};
} else |env_err| switch (env_err) {
@@ -472,14 +493,21 @@ pub fn start(options: Options) Node {
if (options.disable_printing) {
return Node.none;
}
- const stderr: std.fs.File = .stderr();
+ const stderr: Io.File = .stderr();
global_progress.terminal = stderr;
- if (stderr.getOrEnableAnsiEscapeSupport()) {
+ if (stderr.enableAnsiEscapeCodes(io)) |_| {
global_progress.terminal_mode = .ansi_escape_codes;
- } else if (is_windows and stderr.isTty()) {
- global_progress.terminal_mode = TerminalMode{ .windows_api = .{
- .code_page = windows.kernel32.GetConsoleOutputCP(),
- } };
+ } else |_| if (is_windows) {
+ if (stderr.isTty(io)) |is_tty| {
+ if (is_tty) global_progress.terminal_mode = TerminalMode{ .windows_api = .{
+ .code_page = windows.kernel32.GetConsoleOutputCP(),
+ } };
+ } else |err| switch (err) {
+ error.Canceled => {
+ io.recancel();
+ return Node.none;
+ },
+ }
}
if (global_progress.terminal_mode == .off) {
@@ -497,17 +525,17 @@ pub fn start(options: Options) Node {
if (switch (global_progress.terminal_mode) {
.off => unreachable, // handled a few lines above
- .ansi_escape_codes => std.Thread.spawn(.{}, updateThreadRun, .{}),
- .windows_api => if (is_windows) std.Thread.spawn(.{}, windowsApiUpdateThreadRun, .{}) else unreachable,
- }) |thread| {
- global_progress.update_thread = thread;
+ .ansi_escape_codes => io.concurrent(updateThreadRun, .{io}),
+ .windows_api => if (is_windows) io.concurrent(windowsApiUpdateThreadRun, .{io}) else unreachable,
+ }) |future| {
+ global_progress.update_worker = future;
} else |err| {
- std.log.warn("unable to spawn thread for printing progress to terminal: {s}", .{@errorName(err)});
+ global_progress.start_failure = .{ .spawn_update_worker = err };
return Node.none;
}
},
else => |e| {
- std.log.warn("invalid ZIG_PROGRESS file descriptor integer: {s}", .{@errorName(e)});
+ global_progress.start_failure = .{ .parse_env_var = e };
return Node.none;
},
}
@@ -521,48 +549,52 @@ pub fn setStatus(new_status: Status) void {
}
/// Returns whether a resize is needed to learn the terminal size.
-fn wait(timeout_ns: u64) bool {
- const resize_flag = if (global_progress.redraw_event.timedWait(timeout_ns)) |_| true else |err| switch (err) {
- error.Timeout => false,
+fn wait(io: Io, timeout_ns: u64) bool {
+ const timeout: Io.Timeout = .{ .duration = .{
+ .clock = .awake,
+ .raw = .fromNanoseconds(timeout_ns),
+ } };
+ const resize_flag = if (global_progress.redraw_event.waitTimeout(io, timeout)) |_| true else |err| switch (err) {
+ error.Timeout, error.Canceled => false,
};
global_progress.redraw_event.reset();
return resize_flag or (global_progress.cols == 0);
}
-fn updateThreadRun() void {
+fn updateThreadRun(io: Io) void {
// Store this data in the thread so that it does not need to be part of the
// linker data of the main executable.
var serialized_buffer: Serialized.Buffer = undefined;
{
- const resize_flag = wait(global_progress.initial_delay_ns);
+ const resize_flag = wait(io, global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
maybeUpdateSize(resize_flag);
const buffer, _ = computeRedraw(&serialized_buffer);
- if (stderr_mutex.tryLock()) {
- defer stderr_mutex.unlock();
- write(buffer) catch return;
+ if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| {
+ defer io.unlockStderr();
global_progress.need_clear = true;
+ locked_stderr.file_writer.interface.writeAll(buffer) catch return;
}
}
while (true) {
- const resize_flag = wait(global_progress.refresh_rate_ns);
+ const resize_flag = wait(io, global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
- stderr_mutex.lock();
- defer stderr_mutex.unlock();
- return clearWrittenWithEscapeCodes() catch {};
+ const stderr = io.lockStderr(&.{}, null) catch return;
+ defer io.unlockStderr();
+ return clearWrittenWithEscapeCodes(stderr.file_writer) catch {};
}
maybeUpdateSize(resize_flag);
const buffer, _ = computeRedraw(&serialized_buffer);
- if (stderr_mutex.tryLock()) {
- defer stderr_mutex.unlock();
- write(buffer) catch return;
+ if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| {
+ defer io.unlockStderr();
global_progress.need_clear = true;
+ locked_stderr.file_writer.interface.writeAll(buffer) catch return;
}
}
}
@@ -575,117 +607,72 @@ fn windowsApiWriteMarker() void {
_ = windows.kernel32.WriteConsoleW(handle, &[_]u16{windows_api_start_marker}, 1, &num_chars_written, null);
}
-fn windowsApiUpdateThreadRun() void {
+fn windowsApiUpdateThreadRun(io: Io) void {
var serialized_buffer: Serialized.Buffer = undefined;
{
- const resize_flag = wait(global_progress.initial_delay_ns);
+ const resize_flag = wait(io, global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) return;
maybeUpdateSize(resize_flag);
const buffer, const nl_n = computeRedraw(&serialized_buffer);
- if (stderr_mutex.tryLock()) {
- defer stderr_mutex.unlock();
+ if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| {
+ defer io.unlockStderr();
windowsApiWriteMarker();
- write(buffer) catch return;
global_progress.need_clear = true;
+ locked_stderr.file_writer.interface.writeAll(buffer) catch return;
windowsApiMoveToMarker(nl_n) catch return;
}
}
while (true) {
- const resize_flag = wait(global_progress.refresh_rate_ns);
+ const resize_flag = wait(io, global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic)) {
- stderr_mutex.lock();
- defer stderr_mutex.unlock();
+ _ = io.lockStderr(&.{}, null) catch return;
+ defer io.unlockStderr();
return clearWrittenWindowsApi() catch {};
}
maybeUpdateSize(resize_flag);
const buffer, const nl_n = computeRedraw(&serialized_buffer);
- if (stderr_mutex.tryLock()) {
- defer stderr_mutex.unlock();
+ if (io.tryLockStderr(&.{}, null) catch return) |locked_stderr| {
+ defer io.unlockStderr();
clearWrittenWindowsApi() catch return;
windowsApiWriteMarker();
- write(buffer) catch return;
global_progress.need_clear = true;
+ locked_stderr.file_writer.interface.writeAll(buffer) catch return;
windowsApiMoveToMarker(nl_n) catch return;
}
}
}
-/// Allows the caller to freely write to stderr until `unlockStdErr` is called.
-///
-/// During the lock, any `std.Progress` information is cleared from the terminal.
-///
-/// The lock is recursive; the same thread may hold the lock multiple times.
-pub fn lockStdErr() void {
- stderr_mutex.lock();
- clearWrittenWithEscapeCodes() catch {};
-}
-
-pub fn unlockStdErr() void {
- stderr_mutex.unlock();
-}
-
-/// Protected by `stderr_mutex`.
-const stderr_writer: *Writer = &stderr_file_writer.interface;
-/// Protected by `stderr_mutex`.
-var stderr_file_writer: std.fs.File.Writer = .{
- .interface = std.fs.File.Writer.initInterface(&.{}),
- .file = if (is_windows) undefined else .stderr(),
- .mode = .streaming,
-};
-
-/// Allows the caller to freely write to the returned `Writer`,
-/// initialized with `buffer`, until `unlockStderrWriter` is called.
-///
-/// During the lock, any `std.Progress` information is cleared from the terminal.
-///
-/// The lock is recursive; the same thread may hold the lock multiple times.
-pub fn lockStderrWriter(buffer: []u8) *Writer {
- stderr_mutex.lock();
- clearWrittenWithEscapeCodes() catch {};
- if (is_windows) stderr_file_writer.file = .stderr();
- stderr_writer.flush() catch {};
- stderr_writer.buffer = buffer;
- return stderr_writer;
-}
-
-pub fn unlockStderrWriter() void {
- stderr_writer.flush() catch {};
- stderr_writer.end = 0;
- stderr_writer.buffer = &.{};
- stderr_mutex.unlock();
-}
-
-fn ipcThreadRun(fd: posix.fd_t) anyerror!void {
+fn ipcThreadRun(io: Io, file: Io.File) void {
// Store this data in the thread so that it does not need to be part of the
// linker data of the main executable.
var serialized_buffer: Serialized.Buffer = undefined;
{
- _ = wait(global_progress.initial_delay_ns);
+ _ = wait(io, global_progress.initial_delay_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic))
return;
const serialized = serialize(&serialized_buffer);
- writeIpc(fd, serialized) catch |err| switch (err) {
+ writeIpc(io, file, serialized) catch |err| switch (err) {
error.BrokenPipe => return,
};
}
while (true) {
- _ = wait(global_progress.refresh_rate_ns);
+ _ = wait(io, global_progress.refresh_rate_ns);
if (@atomicLoad(bool, &global_progress.done, .monotonic))
return;
const serialized = serialize(&serialized_buffer);
- writeIpc(fd, serialized) catch |err| switch (err) {
+ writeIpc(io, file, serialized) catch |err| switch (err) {
error.BrokenPipe => return,
};
}
@@ -784,11 +771,10 @@ fn appendTreeSymbol(symbol: TreeSymbol, buf: []u8, start_i: usize) usize {
}
}
-fn clearWrittenWithEscapeCodes() anyerror!void {
+pub fn clearWrittenWithEscapeCodes(file_writer: *Io.File.Writer) Io.Writer.Error!void {
if (noop_impl or !global_progress.need_clear) return;
-
+ try file_writer.interface.writeAll(clear ++ progress_remove);
global_progress.need_clear = false;
- try write(clear ++ progress_remove);
}
/// U+25BA or ►
@@ -948,11 +934,11 @@ const SavedMetadata = struct {
const Fd = enum(i32) {
_,
- fn init(fd: posix.fd_t) Fd {
+ fn init(fd: Io.File.Handle) Fd {
return @enumFromInt(if (is_windows) @as(isize, @bitCast(@intFromPtr(fd))) else fd);
}
- fn get(fd: Fd) posix.fd_t {
+ fn get(fd: Fd) Io.File.Handle {
return if (is_windows)
@ptrFromInt(@as(usize, @bitCast(@as(isize, @intFromEnum(fd)))))
else
@@ -963,6 +949,7 @@ const Fd = enum(i32) {
var ipc_metadata_len: u8 = 0;
fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buffer) usize {
+ const io = global_progress.io;
const ipc_metadata_fds_copy = &serialized_buffer.ipc_metadata_fds_copy;
const ipc_metadata_copy = &serialized_buffer.ipc_metadata_copy;
const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds;
@@ -981,14 +968,14 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
0..,
) |main_parent, *main_storage, main_index| {
if (main_parent == .unused) continue;
- const fd = main_storage.getIpcFd() orelse continue;
- const opt_saved_metadata = findOld(fd, old_ipc_metadata_fds, old_ipc_metadata);
+ const file: Io.File = .{ .handle = main_storage.getIpcFd() orelse continue };
+ const opt_saved_metadata = findOld(file.handle, old_ipc_metadata_fds, old_ipc_metadata);
var bytes_read: usize = 0;
while (true) {
- const n = posix.read(fd, pipe_buf[bytes_read..]) catch |err| switch (err) {
+ const n = file.readStreaming(io, &.{pipe_buf[bytes_read..]}) catch |err| switch (err) {
error.WouldBlock => break,
else => |e| {
- std.log.debug("failed to read child progress data: {s}", .{@errorName(e)});
+ std.log.debug("failed to read child progress data: {t}", .{e});
main_storage.completed_count = 0;
main_storage.estimated_total_count = 0;
continue :main_loop;
@@ -1014,7 +1001,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
// Ignore all but the last message on the pipe.
var input: []u8 = pipe_buf[0..bytes_read];
if (input.len == 0) {
- serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, 0, fd);
+ serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, 0, file.handle);
continue;
}
@@ -1024,7 +1011,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
if (input.len < expected_bytes) {
// Ignore short reads. We'll handle the next full message when it comes instead.
const remaining_read_trash_bytes: u16 = @intCast(expected_bytes - input.len);
- serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, remaining_read_trash_bytes, fd);
+ serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, remaining_read_trash_bytes, file.handle);
continue :main_loop;
}
if (input.len > expected_bytes) {
@@ -1042,7 +1029,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
const nodes_len: u8 = @intCast(@min(parents.len - 1, serialized_buffer.storage.len - serialized_len));
// Remember in case the pipe is empty on next update.
- ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd);
+ ipc_metadata_fds[ipc_metadata_len] = Fd.init(file.handle);
ipc_metadata[ipc_metadata_len] = .{
.remaining_read_trash_bytes = 0,
.start_index = @intCast(serialized_len),
@@ -1100,7 +1087,7 @@ fn copyRoot(dest: *Node.Storage, src: *align(1) Node.Storage) void {
}
fn findOld(
- ipc_fd: posix.fd_t,
+ ipc_fd: Io.File.Handle,
old_metadata_fds: []Fd,
old_metadata: []SavedMetadata,
) ?*SavedMetadata {
@@ -1118,7 +1105,7 @@ fn useSavedIpcData(
main_index: usize,
opt_saved_metadata: ?*SavedMetadata,
remaining_read_trash_bytes: u16,
- fd: posix.fd_t,
+ fd: Io.File.Handle,
) usize {
const parents_copy = &serialized_buffer.parents_copy;
const storage_copy = &serialized_buffer.storage_copy;
@@ -1415,13 +1402,9 @@ fn withinRowLimit(p: *Progress, nl_n: usize) bool {
return nl_n + 2 < p.rows;
}
-fn write(buf: []const u8) anyerror!void {
- try global_progress.terminal.writeAll(buf);
-}
-
var remaining_write_trash_bytes: usize = 0;
-fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void {
+fn writeIpc(io: Io, file: Io.File, serialized: Serialized) error{BrokenPipe}!void {
// Byteswap if necessary to ensure little endian over the pipe. This is
// needed because the parent or child process might be running in qemu.
if (is_big_endian) for (serialized.storage) |*s| s.byteSwap();
@@ -1432,11 +1415,7 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void {
const storage = std.mem.sliceAsBytes(serialized.storage);
const parents = std.mem.sliceAsBytes(serialized.parents);
- var vecs: [3]posix.iovec_const = .{
- .{ .base = header.ptr, .len = header.len },
- .{ .base = storage.ptr, .len = storage.len },
- .{ .base = parents.ptr, .len = parents.len },
- };
+ var vecs: [3][]const u8 = .{ header, storage, parents };
// Ensures the packet can fit in the pipe buffer.
const upper_bound_msg_len = 1 + node_storage_buffer_len * @sizeOf(Node.Storage) +
@@ -1447,14 +1426,14 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void {
// We do this in a separate write call to give a better chance for the
// writev below to be in a single packet.
const n = @min(parents.len, remaining_write_trash_bytes);
- if (posix.write(fd, parents[0..n])) |written| {
+ if (io.vtable.fileWriteStreaming(io.userdata, file, &.{}, &.{parents[0..n]}, 1)) |written| {
remaining_write_trash_bytes -= written;
continue;
} else |err| switch (err) {
error.WouldBlock => return,
error.BrokenPipe => return error.BrokenPipe,
else => |e| {
- std.log.debug("failed to send progress to parent process: {s}", .{@errorName(e)});
+ std.log.debug("failed to send progress to parent process: {t}", .{e});
return error.BrokenPipe;
},
}
@@ -1462,7 +1441,7 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void {
// If this write would block we do not want to keep trying, but we need to
// know if a partial message was written.
- if (writevNonblock(fd, &vecs)) |written| {
+ if (writevNonblock(io, file, &vecs)) |written| {
const total = header.len + storage.len + parents.len;
if (written < total) {
remaining_write_trash_bytes = total - written;
@@ -1471,13 +1450,13 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void {
error.WouldBlock => {},
error.BrokenPipe => return error.BrokenPipe,
else => |e| {
- std.log.debug("failed to send progress to parent process: {s}", .{@errorName(e)});
+ std.log.debug("failed to send progress to parent process: {t}", .{e});
return error.BrokenPipe;
},
}
}
-fn writevNonblock(fd: posix.fd_t, iov: []posix.iovec_const) posix.WriteError!usize {
+fn writevNonblock(io: Io, file: Io.File, iov: [][]const u8) Io.File.Writer.Error!usize {
var iov_index: usize = 0;
var written: usize = 0;
var total_written: usize = 0;
@@ -1486,9 +1465,9 @@ fn writevNonblock(fd: posix.fd_t, iov: []posix.iovec_const) posix.WriteError!usi
written >= iov[iov_index].len
else
return total_written) : (iov_index += 1) written -= iov[iov_index].len;
- iov[iov_index].base += written;
+ iov[iov_index].ptr += written;
iov[iov_index].len -= written;
- written = try posix.writev(fd, iov[iov_index..]);
+ written = try io.vtable.fileWriteStreaming(io.userdata, file, &.{}, iov, 1);
if (written == 0) return total_written;
total_written += written;
}
@@ -1538,7 +1517,7 @@ fn handleSigWinch(sig: posix.SIG, info: *const posix.siginfo_t, ctx_ptr: ?*anyop
_ = info;
_ = ctx_ptr;
assert(sig == .WINCH);
- global_progress.redraw_event.set();
+ global_progress.redraw_event.set(global_progress.io);
}
const have_sigwinch = switch (builtin.os.tag) {
@@ -1563,11 +1542,6 @@ const have_sigwinch = switch (builtin.os.tag) {
else => false,
};
-/// The primary motivation for recursive mutex here is so that a panic while
-/// stderr mutex is held still dumps the stack trace and other debug
-/// information.
-var stderr_mutex = std.Thread.Mutex.Recursive.init;
-
fn copyAtomicStore(dest: []align(@alignOf(usize)) u8, src: []const u8) void {
assert(dest.len == src.len);
const chunked_len = dest.len / @sizeOf(usize);