diff options
| author | Jonathan Marler <johnnymarler@gmail.com> | 2023-02-28 14:10:44 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2023-03-01 12:21:53 -0500 |
| commit | 138e8b162aeecc69cc62f0822e73b1862b7d07f6 (patch) | |
| tree | 3aed1af1481222e9a4406dad9da039960e345084 | |
| parent | 4f58a80735b47e6b98ed6f73cc9a0a772cdc6fcd (diff) | |
| download | zig-138e8b162aeecc69cc62f0822e73b1862b7d07f6.tar.gz zig-138e8b162aeecc69cc62f0822e73b1862b7d07f6.zip | |
std.child_process: use std.io.poll for collectOutput
| -rw-r--r-- | lib/std/child_process.zig | 210 |
1 files changed, 31 insertions, 179 deletions
diff --git a/lib/std/child_process.zig b/lib/std/child_process.zig index 55e375b93b..440b512fd2 100644 --- a/lib/std/child_process.zig +++ b/lib/std/child_process.zig @@ -197,6 +197,19 @@ pub const ChildProcess = struct { stderr: []u8, }; + fn fifoToOwnedArrayList(fifo: *std.io.PollFifo) std.ArrayList(u8) { + if (fifo.head > 0) { + std.mem.copy(u8, fifo.buf[0..fifo.count], fifo.buf[fifo.head .. fifo.head + fifo.count]); + } + const result = std.ArrayList(u8){ + .items = fifo.buf[0..fifo.count], + .capacity = fifo.buf.len, + .allocator = fifo.allocator, + }; + fifo.* = std.io.PollFifo.init(fifo.allocator); + return result; + } + /// Collect the output from the process's stdout and stderr. Will return once all output /// has been collected. This does not mean that the process has ended. `wait` should still /// be called to wait for and clean up the process. @@ -210,189 +223,28 @@ pub const ChildProcess = struct { ) !void { debug.assert(child.stdout_behavior == .Pipe); debug.assert(child.stderr_behavior == .Pipe); - if (builtin.os.tag == .windows) { - try collectOutputWindows(child, stdout, stderr, max_output_bytes); - } else { - try collectOutputPosix(child, stdout, stderr, max_output_bytes); - } - } - fn collectOutputPosix( - child: ChildProcess, - stdout: *std.ArrayList(u8), - stderr: *std.ArrayList(u8), - max_output_bytes: usize, - ) !void { - var poll_fds = [_]os.pollfd{ - .{ .fd = child.stdout.?.handle, .events = os.POLL.IN, .revents = undefined }, - .{ .fd = child.stderr.?.handle, .events = os.POLL.IN, .revents = undefined }, - }; + // we could make this work with multiple allocators but YAGNI + if (stdout.allocator.ptr != stderr.allocator.ptr or + stdout.allocator.vtable != stderr.allocator.vtable) + @panic("ChildProcess.collectOutput only supports 1 allocator"); - var dead_fds: usize = 0; - // We ask for ensureTotalCapacity with this much extra space. This has more of an - // effect on small reads because once the reads start to get larger the amount - // of space an ArrayList will allocate grows exponentially. - const bump_amt = 512; - - const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP; - - while (dead_fds < poll_fds.len) { - const events = try os.poll(&poll_fds, std.math.maxInt(i32)); - if (events == 0) continue; - - var remove_stdout = false; - var remove_stderr = false; - // Try reading whatever is available before checking the error - // conditions. - // It's still possible to read after a POLL.HUP is received, always - // check if there's some data waiting to be read first. - if (poll_fds[0].revents & os.POLL.IN != 0) { - // stdout is ready. - const new_capacity = std.math.min(stdout.items.len + bump_amt, max_output_bytes); - try stdout.ensureTotalCapacity(new_capacity); - const buf = stdout.unusedCapacitySlice(); - if (buf.len == 0) return error.StdoutStreamTooLong; - const nread = try os.read(poll_fds[0].fd, buf); - stdout.items.len += nread; - - // Remove the fd when the EOF condition is met. - remove_stdout = nread == 0; - } else { - remove_stdout = poll_fds[0].revents & err_mask != 0; - } - - if (poll_fds[1].revents & os.POLL.IN != 0) { - // stderr is ready. - const new_capacity = std.math.min(stderr.items.len + bump_amt, max_output_bytes); - try stderr.ensureTotalCapacity(new_capacity); - const buf = stderr.unusedCapacitySlice(); - if (buf.len == 0) return error.StderrStreamTooLong; - const nread = try os.read(poll_fds[1].fd, buf); - stderr.items.len += nread; - - // Remove the fd when the EOF condition is met. - remove_stderr = nread == 0; - } else { - remove_stderr = poll_fds[1].revents & err_mask != 0; - } - - // Exclude the fds that signaled an error. - if (remove_stdout) { - poll_fds[0].fd = -1; - dead_fds += 1; - } - if (remove_stderr) { - poll_fds[1].fd = -1; - dead_fds += 1; - } - } - } - - const WindowsAsyncReadResult = enum { - pending, - closed, - full, - }; - - fn windowsAsyncRead( - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - buf: *std.ArrayList(u8), - bump_amt: usize, - max_output_bytes: usize, - ) !WindowsAsyncReadResult { - while (true) { - const new_capacity = std.math.min(buf.items.len + bump_amt, max_output_bytes); - try buf.ensureTotalCapacity(new_capacity); - const next_buf = buf.unusedCapacitySlice(); - if (next_buf.len == 0) return .full; - var read_bytes: u32 = undefined; - const read_result = windows.kernel32.ReadFile(handle, next_buf.ptr, math.cast(u32, next_buf.len) orelse maxInt(u32), &read_bytes, overlapped); - if (read_result == 0) return switch (windows.kernel32.GetLastError()) { - .IO_PENDING => .pending, - .BROKEN_PIPE => .closed, - else => |err| windows.unexpectedError(err), - }; - buf.items.len += read_bytes; - } - } - - fn collectOutputWindows(child: ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void { - const bump_amt = 512; - const outs = [_]*std.ArrayList(u8){ - stdout, - stderr, - }; - const handles = [_]windows.HANDLE{ - child.stdout.?.handle, - child.stderr.?.handle, - }; - - var overlapped = [_]windows.OVERLAPPED{ - mem.zeroes(windows.OVERLAPPED), - mem.zeroes(windows.OVERLAPPED), - }; - - var wait_objects: [2]windows.HANDLE = undefined; - var wait_object_count: u2 = 0; - - // we need to cancel all pending IO before returning so our OVERLAPPED values don't go out of scope - defer for (wait_objects[0..wait_object_count]) |o| { - _ = windows.kernel32.CancelIo(o); - }; - - // Windows Async IO requires an initial call to ReadFile before waiting on the handle - for ([_]u1{ 0, 1 }) |i| { - switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) { - .pending => { - wait_objects[wait_object_count] = handles[i]; - wait_object_count += 1; - }, - .closed => {}, // don't add to the wait_objects list - .full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong, - } + var poller = std.io.poll(stdout.allocator, enum { stdout, stderr }, .{ + .stdout = child.stdout.?, + .stderr = child.stderr.?, + }); + defer poller.deinit(); + + while (!poller.done()) { + try poller.poll(); + if (poller.fifo(.stdout).count > max_output_bytes) + return error.StdoutStreamTooLong; + if (poller.fifo(.stderr).count > max_output_bytes) + return error.StderrStreamTooLong; } - while (wait_object_count > 0) { - const status = windows.kernel32.WaitForMultipleObjects(wait_object_count, &wait_objects, 0, windows.INFINITE); - if (status == windows.WAIT_FAILED) { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } - } - if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + wait_object_count - 1) - unreachable; - - const wait_idx = status - windows.WAIT_OBJECT_0; - - // this extra `i` index is needed to map the wait handle back to the stdout or stderr - // values since the wait_idx can change which handle it corresponds with - const i: u1 = if (wait_objects[wait_idx] == handles[0]) 0 else 1; - - // remove completed event from the wait list - wait_object_count -= 1; - if (wait_idx == 0) - wait_objects[0] = wait_objects[1]; - - var read_bytes: u32 = undefined; - if (windows.kernel32.GetOverlappedResult(handles[i], &overlapped[i], &read_bytes, 0) == 0) { - switch (windows.kernel32.GetLastError()) { - .BROKEN_PIPE => continue, - else => |err| return windows.unexpectedError(err), - } - } - - outs[i].items.len += read_bytes; - - switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) { - .pending => { - wait_objects[wait_object_count] = handles[i]; - wait_object_count += 1; - }, - .closed => {}, // don't add to the wait_objects list - .full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong, - } - } + stdout.* = fifoToOwnedArrayList(poller.fifo(.stdout)); + stderr.* = fifoToOwnedArrayList(poller.fifo(.stderr)); } /// Spawns a child process, waits for it, collecting stdout and stderr, and then returns. |
