diff options
| author | Jonathan Marler <johnnymarler@gmail.com> | 2021-06-17 17:36:42 -0600 |
|---|---|---|
| committer | Jonathan Marler <johnnymarler@gmail.com> | 2021-06-18 08:26:22 -0600 |
| commit | 9e0338b82e45f672975bf7daa1ade5f4b2de4c01 (patch) | |
| tree | 3fbe11f9e77dabc5dbe089a93f238d15a23a07df | |
| parent | 1e0d68e6fbfe6d5d759ad773b656c5a5acba7a5e (diff) | |
| download | zig-9e0338b82e45f672975bf7daa1ade5f4b2de4c01.tar.gz zig-9e0338b82e45f672975bf7daa1ade5f4b2de4c01.zip | |
finish ChildProcess collectOutputWindows
This finishes LemonBoy's Draft PR ziglang#6750. It updates ChildProcess to collect the output from stdout/stderr asynchronously using Overlapped IO and named pipes.
| -rw-r--r-- | lib/std/child_process.zig | 192 | ||||
| -rw-r--r-- | lib/std/os/windows/kernel32.zig | 47 |
2 files changed, 118 insertions, 121 deletions
diff --git a/lib/std/child_process.zig b/lib/std/child_process.zig index 644a17c1dc..b63153e904 100644 --- a/lib/std/child_process.zig +++ b/lib/std/child_process.zig @@ -13,6 +13,7 @@ const process = std.process; const File = std.fs.File; const windows = os.windows; const mem = std.mem; +const math = std.math; const debug = std.debug; const BufMap = std.BufMap; const builtin = std.builtin; @@ -257,58 +258,76 @@ pub const ChildProcess = struct { } } - fn collectOutputWindows(child: *const ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void { - var wait_objects = [_]windows.kernel32.HANDLE{ - child.stdout.?.handle, child.stderr.?.handle, + fn collectOutputWindows(child: *const ChildProcess, outs: [2]*std.ArrayList(u8), max_output_bytes: usize) !void { + const bump_amt = 512; + const handles = [_]windows.HANDLE{ + child.stdout.?.handle, + child.stderr.?.handle, }; - var waiting_objects: u32 = wait_objects.len; - // XXX: Calling zeroes([2]windows.OVERLAPPED) causes the stage1 compiler - // to crash and burn. var overlapped = [_]windows.OVERLAPPED{ mem.zeroes(windows.OVERLAPPED), mem.zeroes(windows.OVERLAPPED), }; - var temp_buf: [2][4096]u8 = undefined; - - // Kickstart the loop by issuing two async reads. - // ReadFile returns false and GetLastError returns ERROR_IO_PENDING if - // everything is ok. - _ = windows.kernel32.ReadFile(wait_objects[0], &temp_buf[0], temp_buf[0].len, null, &overlapped[0]); - _ = windows.kernel32.ReadFile(wait_objects[1], &temp_buf[1], temp_buf[1].len, null, &overlapped[1]); - - poll: while (waiting_objects > 0) { - const status = windows.kernel32.WaitForMultipleObjects(waiting_objects, &wait_objects, 0, windows.INFINITE); - switch (status) { - windows.WAIT_OBJECT_0 + 0...windows.WAIT_OBJECT_0 + 1 => { - // stdout (or stderr) is ready. - const object = status - windows.WAIT_OBJECT_0; - - var read_bytes: u32 = undefined; - if (windows.kernel32.GetOverlappedResult(wait_objects[object], &overlapped[object], &read_bytes, 0) == 0) { - switch (windows.kernel32.GetLastError()) { - .BROKEN_PIPE => { - // Move it to the end to remove it. - if (object != waiting_objects - 1) - mem.swap(windows.kernel32.HANDLE, &wait_objects[object], &wait_objects[waiting_objects - 1]); - waiting_objects -= 1; - continue :poll; - }, - else => |err| return windows.unexpectedError(err), - } - } - try stdout.appendSlice(temp_buf[object][0..read_bytes]); - _ = windows.kernel32.ReadFile(wait_objects[object], &temp_buf[object], temp_buf[object].len, null, &overlapped[object]); - }, - windows.WAIT_FAILED => { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } - }, - // We're waiting with an infinite timeout - windows.WAIT_TIMEOUT => unreachable, - else => unreachable, + + var wait_objects: [2]windows.HANDLE = undefined; + var wait_object_count: u2 = 0; + + // we need to cancel all pending IO before returning so our OVERLAPPED values don't go out of scope + defer for (wait_objects[0..wait_object_count]) |o| { + _ = windows.kernel32.CancelIo(o); + }; + + // Windows Async IO requires an initial call to ReadFile before waiting on the handle + for ([_]u1{ 0, 1 }) |i| { + try outs[i].ensureCapacity(bump_amt); + const buf = outs[i].unusedCapacitySlice(); + _ = windows.kernel32.ReadFile(handles[i], buf.ptr, math.cast(u32, buf.len) catch maxInt(u32), null, &overlapped[i]); + wait_objects[wait_object_count] = handles[i]; + wait_object_count += 1; + } + + while (true) { + const status = windows.kernel32.WaitForMultipleObjects(wait_object_count, &wait_objects, 0, windows.INFINITE); + if (status == windows.WAIT_FAILED) { + switch (windows.kernel32.GetLastError()) { + else => |err| return windows.unexpectedError(err), + } } + if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + wait_object_count - 1) + unreachable; + + const wait_idx = status - windows.WAIT_OBJECT_0; + + // this extra `i` index is needed to map the wait handle back to the stdout or stderr + // values since the wait_idx can change which handle it corresponds with + const i: u1 = if (wait_objects[wait_idx] == handles[0]) 0 else 1; + + // remove completed event from the wait list + wait_object_count -= 1; + if (wait_idx == 0) + wait_objects[0] = wait_objects[1]; + + var read_bytes: u32 = undefined; + if (windows.kernel32.GetOverlappedResult(handles[i], &overlapped[i], &read_bytes, 0) == 0) { + switch (windows.kernel32.GetLastError()) { + .BROKEN_PIPE => { + if (wait_object_count == 0) + break; + continue; + }, + else => |err| return windows.unexpectedError(err), + } + } + + outs[i].items.len += read_bytes; + const new_capacity = std.math.min(outs[i].items.len + bump_amt, max_output_bytes); + try outs[i].ensureCapacity(new_capacity); + const buf = outs[i].unusedCapacitySlice(); + if (buf.len == 0) return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong; + _ = windows.kernel32.ReadFile(handles[i], buf.ptr, math.cast(u32, buf.len) catch maxInt(u32), null, &overlapped[i]); + wait_objects[wait_object_count] = handles[i]; + wait_object_count += 1; } } @@ -361,12 +380,8 @@ pub const ChildProcess = struct { stderr.deinit(); } - try collectOutputPosix(child, &stdout, &stderr, args.max_output_bytes); - - // XXX: Respect max_output_bytes - // XXX: Smarter reading logic, read directly into the ArrayList if (builtin.os.tag == .windows) { - try collectOutputWindows(child, &stdout, &stderr, args.max_output_bytes); + try collectOutputWindows(child, [_]*std.ArrayList(u8){ &stdout, &stderr }, args.max_output_bytes); } else { try collectOutputPosix(child, &stdout, &stderr, args.max_output_bytes); } @@ -707,7 +722,7 @@ pub const ChildProcess = struct { var g_hChildStd_OUT_Wr: ?windows.HANDLE = null; switch (self.stdout_behavior) { StdIo.Pipe => { - try windowsMakePipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr); + try windowsMakeAsyncPipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr); }, StdIo.Ignore => { g_hChildStd_OUT_Wr = nul_handle; @@ -727,7 +742,7 @@ pub const ChildProcess = struct { var g_hChildStd_ERR_Wr: ?windows.HANDLE = null; switch (self.stderr_behavior) { StdIo.Pipe => { - try windowsMakePipe(&g_hChildStd_ERR_Rd, &g_hChildStd_ERR_Wr, &saAttr); + try windowsMakeAsyncPipe(&g_hChildStd_ERR_Rd, &g_hChildStd_ERR_Wr, &saAttr); }, StdIo.Ignore => { g_hChildStd_ERR_Wr = nul_handle; @@ -960,25 +975,43 @@ fn windowsDestroyPipe(rd: ?windows.HANDLE, wr: ?windows.HANDLE) void { if (wr) |h| os.close(h); } -var pipe_name_counter = std.atomic.Int(u32).init(1); +fn windowsMakePipeIn(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { + var rd_h: windows.HANDLE = undefined; + var wr_h: windows.HANDLE = undefined; + try windows.CreatePipe(&rd_h, &wr_h, sattr); + errdefer windowsDestroyPipe(rd_h, wr_h); + try windows.SetHandleInformation(wr_h, windows.HANDLE_FLAG_INHERIT, 0); + rd.* = rd_h; + wr.* = wr_h; +} -fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { - var tmp_buf: [128]u8 = undefined; - // Forge a random path for the pipe. - const pipe_path = std.fmt.bufPrintZ( - &tmp_buf, - "\\\\.\\pipe\\zig-childprocess-{d}-{d}", - .{ windows.kernel32.GetCurrentProcessId(), pipe_name_counter.fetchAdd(1) }, - ) catch unreachable; +var pipe_name_counter = std.atomic.Atomic(u32).init(1); + +fn windowsMakeAsyncPipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { + var tmp_bufw: [128]u16 = undefined; + + // We must make a named pipe on windows because anonymous pipes do not support async IO + const pipe_path = blk: { + var tmp_buf: [128]u8 = undefined; + // Forge a random path for the pipe. + const pipe_path = std.fmt.bufPrintZ( + &tmp_buf, + "\\\\.\\pipe\\zig-childprocess-{d}-{d}", + .{ windows.kernel32.GetCurrentProcessId(), pipe_name_counter.fetchAdd(1, .Monotonic) }, + ) catch unreachable; + const len = std.unicode.utf8ToUtf16Le(&tmp_bufw, pipe_path) catch unreachable; + tmp_bufw[len] = 0; + break :blk tmp_bufw[0..len :0]; + }; // Create the read handle that can be used with overlapped IO ops. - const read_handle = windows.kernel32.CreateNamedPipeA( - pipe_path, + const read_handle = windows.kernel32.CreateNamedPipeW( + pipe_path.ptr, windows.PIPE_ACCESS_INBOUND | windows.FILE_FLAG_OVERLAPPED, windows.PIPE_TYPE_BYTE, 1, - 0x1000, - 0x1000, + 4096, + 4096, 0, sattr, ); @@ -987,12 +1020,14 @@ fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const win else => |err| return windows.unexpectedError(err), } } + errdefer os.close(read_handle); - const write_handle = windows.kernel32.CreateFileA( - pipe_path, + var sattr_copy = sattr.*; + const write_handle = windows.kernel32.CreateFileW( + pipe_path.ptr, windows.GENERIC_WRITE, 0, - sattr, + &sattr_copy, windows.OPEN_EXISTING, windows.FILE_ATTRIBUTE_NORMAL, null, @@ -1002,6 +1037,7 @@ fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const win else => |err| return windows.unexpectedError(err), } } + errdefer os.close(write_handle); try windows.SetHandleInformation(read_handle, windows.HANDLE_FLAG_INHERIT, 0); @@ -1009,26 +1045,6 @@ fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const win wr.* = write_handle; } -fn windowsMakePipeIn(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { - var rd_h: windows.HANDLE = undefined; - var wr_h: windows.HANDLE = undefined; - try windows.CreatePipe(&rd_h, &wr_h, sattr); - errdefer windowsDestroyPipe(rd_h, wr_h); - try windows.SetHandleInformation(wr_h, windows.HANDLE_FLAG_INHERIT, 0); - rd.* = rd_h; - wr.* = wr_h; -} - -fn windowsMakePipeOut(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void { - var rd_h: windows.HANDLE = undefined; - var wr_h: windows.HANDLE = undefined; - try windows.CreatePipe(&rd_h, &wr_h, sattr); - errdefer windowsDestroyPipe(rd_h, wr_h); - try windows.SetHandleInformation(rd_h, windows.HANDLE_FLAG_INHERIT, 0); - rd.* = rd_h; - wr.* = wr_h; -} - fn destroyPipe(pipe: [2]os.fd_t) void { os.close(pipe[0]); if (pipe[0] != pipe[1]) os.close(pipe[1]); diff --git a/lib/std/os/windows/kernel32.zig b/lib/std/os/windows/kernel32.zig index f847a34162..971273ef3a 100644 --- a/lib/std/os/windows/kernel32.zig +++ b/lib/std/os/windows/kernel32.zig @@ -8,6 +8,7 @@ usingnamespace @import("bits.zig"); pub extern "kernel32" fn AddVectoredExceptionHandler(First: c_ulong, Handler: ?VECTORED_EXCEPTION_HANDLER) callconv(WINAPI) ?*c_void; pub extern "kernel32" fn RemoveVectoredExceptionHandler(Handle: HANDLE) callconv(WINAPI) c_ulong; +pub extern "kernel32" fn CancelIo(hFile: HANDLE) callconv(WINAPI) BOOL; pub extern "kernel32" fn CancelIoEx(hFile: HANDLE, lpOverlapped: ?LPOVERLAPPED) callconv(WINAPI) BOOL; pub extern "kernel32" fn CloseHandle(hObject: HANDLE) callconv(WINAPI) BOOL; @@ -15,29 +16,6 @@ pub extern "kernel32" fn CloseHandle(hObject: HANDLE) callconv(WINAPI) BOOL; pub extern "kernel32" fn CreateDirectoryW(lpPathName: [*:0]const u16, lpSecurityAttributes: ?*SECURITY_ATTRIBUTES) callconv(WINAPI) BOOL; pub extern "kernel32" fn SetEndOfFile(hFile: HANDLE) callconv(WINAPI) BOOL; -pub extern "kernel32" fn GetCurrentProcessId() callconv(WINAPI) DWORD; - -pub extern "kernel32" fn CreateNamedPipeA( - lpName: [*:0]const u8, - dwOpenMode: DWORD, - dwPipeMode: DWORD, - nMaxInstances: DWORD, - nOutBufferSize: DWORD, - nInBufferSize: DWORD, - nDefaultTimeOut: DWORD, - lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES, -) callconv(WINAPI) HANDLE; -pub extern "kernel32" fn CreateNamedPipeW( - lpName: LPCWSTR, - dwOpenMode: DWORD, - dwPipeMode: DWORD, - nMaxInstances: DWORD, - nOutBufferSize: DWORD, - nInBufferSize: DWORD, - nDefaultTimeOut: DWORD, - lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES, -) callconv(WINAPI) HANDLE; - pub extern "kernel32" fn CreateEventExW( lpEventAttributes: ?*SECURITY_ATTRIBUTES, lpName: [*:0]const u16, @@ -55,16 +33,6 @@ pub extern "kernel32" fn CreateFileW( hTemplateFile: ?HANDLE, ) callconv(WINAPI) HANDLE; -pub extern "kernel32" fn CreateFileA( - lpFileName: [*:0]const u8, - dwDesiredAccess: DWORD, - dwShareMode: DWORD, - lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES, - dwCreationDisposition: DWORD, - dwFlagsAndAttributes: DWORD, - hTemplateFile: ?HANDLE, -) callconv(WINAPI) HANDLE; - pub extern "kernel32" fn CreatePipe( hReadPipe: *HANDLE, hWritePipe: *HANDLE, @@ -72,6 +40,17 @@ pub extern "kernel32" fn CreatePipe( nSize: DWORD, ) callconv(WINAPI) BOOL; +pub extern "kernel32" fn CreateNamedPipeW( + lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES, +) callconv(WINAPI) HANDLE; + pub extern "kernel32" fn CreateProcessW( lpApplicationName: ?LPWSTR, lpCommandLine: LPWSTR, @@ -132,6 +111,8 @@ pub extern "kernel32" fn GetCurrentDirectoryW(nBufferLength: DWORD, lpBuffer: ?[ pub extern "kernel32" fn GetCurrentThread() callconv(WINAPI) HANDLE; pub extern "kernel32" fn GetCurrentThreadId() callconv(WINAPI) DWORD; +pub extern "kernel32" fn GetCurrentProcessId() callconv(WINAPI) DWORD; + pub extern "kernel32" fn GetCurrentProcess() callconv(WINAPI) HANDLE; pub extern "kernel32" fn GetEnvironmentStringsW() callconv(WINAPI) ?[*:0]u16; |
