diff options
| author | hryx <codroid@gmail.com> | 2019-05-27 17:24:21 -0700 |
|---|---|---|
| committer | hryx <codroid@gmail.com> | 2019-05-27 17:24:21 -0700 |
| commit | e1f3eec9cc05535b3f3b81f2fb7cd65dd4d1e841 (patch) | |
| tree | 5f408ed68a686491eaf759f9cbba02beac829b38 /std/event | |
| parent | 2aa1c5da5dded6b1b346c3a1b57443f2c459ebe9 (diff) | |
| parent | 3fccc0747903f0726d6cc8ee73832cb62f1304bb (diff) | |
| download | zig-e1f3eec9cc05535b3f3b81f2fb7cd65dd4d1e841.tar.gz zig-e1f3eec9cc05535b3f3b81f2fb7cd65dd4d1e841.zip | |
Merge branch 'master' into translate-c-userland
Diffstat (limited to 'std/event')
| -rw-r--r-- | std/event/fs.zig | 224 | ||||
| -rw-r--r-- | std/event/group.zig | 2 | ||||
| -rw-r--r-- | std/event/loop.zig | 268 | ||||
| -rw-r--r-- | std/event/net.zig | 168 | ||||
| -rw-r--r-- | std/event/rwlock.zig | 4 |
5 files changed, 328 insertions, 338 deletions
diff --git a/std/event/fs.zig b/std/event/fs.zig index 9756d15a76..55731a607c 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -5,9 +5,10 @@ const assert = std.debug.assert; const testing = std.testing; const os = std.os; const mem = std.mem; -const posix = os.posix; const windows = os.windows; const Loop = event.Loop; +const fd_t = os.fd_t; +const File = std.fs.File; pub const RequestNode = std.atomic.Queue(Request).Node; @@ -30,53 +31,53 @@ pub const Request = struct { End, // special - means the fs thread should exit pub const PWriteV = struct { - fd: os.FileHandle, - iov: []const os.posix.iovec_const, + fd: fd_t, + iov: []const os.iovec_const, offset: usize, result: Error!void, - pub const Error = os.PosixWriteError; + pub const Error = os.WriteError; }; pub const PReadV = struct { - fd: os.FileHandle, - iov: []const os.posix.iovec, + fd: fd_t, + iov: []const os.iovec, offset: usize, result: Error!usize, - pub const Error = os.PosixReadError; + pub const Error = os.ReadError; }; pub const Open = struct { /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 path: []const u8, flags: u32, - mode: os.File.Mode, - result: Error!os.FileHandle, + mode: File.Mode, + result: Error!fd_t, - pub const Error = os.File.OpenError; + pub const Error = File.OpenError; }; pub const WriteFile = struct { /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 path: []const u8, contents: []const u8, - mode: os.File.Mode, + mode: File.Mode, result: Error!void, - pub const Error = os.File.OpenError || os.File.WriteError; + pub const Error = File.OpenError || File.WriteError; }; pub const Close = struct { - fd: os.FileHandle, + fd: fd_t, }; }; }; -pub const PWriteVError = error{OutOfMemory} || os.File.WriteError; +pub const PWriteVError = error{OutOfMemory} || File.WriteError; /// data - just the inner references - must live until pwritev promise completes. -pub async fn pwritev(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) PWriteVError!void { +pub async fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -87,11 +88,11 @@ pub async fn pwritev(loop: *Loop, fd: os.FileHandle, data: []const []const u8, o builtin.Os.freebsd, builtin.Os.netbsd, => { - const iovecs = try loop.allocator.alloc(os.posix.iovec_const, data.len); + const iovecs = try loop.allocator.alloc(os.iovec_const, data.len); defer loop.allocator.free(iovecs); for (data) |buf, i| { - iovecs[i] = os.posix.iovec_const{ + iovecs[i] = os.iovec_const{ .iov_base = buf.ptr, .iov_len = buf.len, }; @@ -109,7 +110,7 @@ pub async fn pwritev(loop: *Loop, fd: os.FileHandle, data: []const []const u8, o } /// data must outlive the returned promise -pub async fn pwritevWindows(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) os.WindowsWriteError!void { +pub async fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void { if (data.len == 0) return; if (data.len == 1) return await (async pwriteWindows(loop, fd, data[0], offset) catch unreachable); @@ -121,7 +122,7 @@ pub async fn pwritevWindows(loop: *Loop, fd: os.FileHandle, data: []const []cons } } -pub async fn pwriteWindows(loop: *Loop, fd: os.FileHandle, data: []const u8, offset: u64) os.WindowsWriteError!void { +pub async fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -146,33 +147,32 @@ pub async fn pwriteWindows(loop: *Loop, fd: os.FileHandle, data: []const u8, off errdefer loop.finishOneEvent(); errdefer { - _ = windows.CancelIoEx(fd, &resume_node.base.overlapped); + _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); } suspend { - _ = windows.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); + _ = windows.kernel32.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); } var bytes_transferred: windows.DWORD = undefined; - if (windows.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - const err = windows.GetLastError(); - return switch (err) { + if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + switch (windows.kernel32.GetLastError()) { windows.ERROR.IO_PENDING => unreachable, - windows.ERROR.INVALID_USER_BUFFER => error.SystemResources, - windows.ERROR.NOT_ENOUGH_MEMORY => error.SystemResources, - windows.ERROR.OPERATION_ABORTED => error.OperationAborted, - windows.ERROR.NOT_ENOUGH_QUOTA => error.SystemResources, - windows.ERROR.BROKEN_PIPE => error.BrokenPipe, - else => os.unexpectedErrorWindows(err), - }; + windows.ERROR.INVALID_USER_BUFFER => return error.SystemResources, + windows.ERROR.NOT_ENOUGH_MEMORY => return error.SystemResources, + windows.ERROR.OPERATION_ABORTED => return error.OperationAborted, + windows.ERROR.NOT_ENOUGH_QUOTA => return error.SystemResources, + windows.ERROR.BROKEN_PIPE => return error.BrokenPipe, + else => |err| return windows.unexpectedError(err), + } } } /// iovecs must live until pwritev promise completes. pub async fn pwritevPosix( loop: *Loop, - fd: os.FileHandle, - iovecs: []const posix.iovec_const, + fd: fd_t, + iovecs: []const os.iovec_const, offset: usize, -) os.PosixWriteError!void { +) os.WriteError!void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -209,10 +209,10 @@ pub async fn pwritevPosix( return req_node.data.msg.PWriteV.result; } -pub const PReadVError = error{OutOfMemory} || os.File.ReadError; +pub const PReadVError = error{OutOfMemory} || File.ReadError; /// data - just the inner references - must live until preadv promise completes. -pub async fn preadv(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: usize) PReadVError!usize { +pub async fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -225,11 +225,11 @@ pub async fn preadv(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: builtin.Os.freebsd, builtin.Os.netbsd, => { - const iovecs = try loop.allocator.alloc(os.posix.iovec, data.len); + const iovecs = try loop.allocator.alloc(os.iovec, data.len); defer loop.allocator.free(iovecs); for (data) |buf, i| { - iovecs[i] = os.posix.iovec{ + iovecs[i] = os.iovec{ .iov_base = buf.ptr, .iov_len = buf.len, }; @@ -247,7 +247,7 @@ pub async fn preadv(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: } /// data must outlive the returned promise -pub async fn preadvWindows(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: u64) os.WindowsReadError!usize { +pub async fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize { assert(data.len != 0); if (data.len == 1) return await (async preadWindows(loop, fd, data[0], offset) catch unreachable); @@ -271,7 +271,7 @@ pub async fn preadvWindows(loop: *Loop, fd: os.FileHandle, data: []const []u8, o } } -pub async fn preadWindows(loop: *Loop, fd: os.FileHandle, data: []u8, offset: u64) !usize { +pub async fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -291,25 +291,24 @@ pub async fn preadWindows(loop: *Loop, fd: os.FileHandle, data: []u8, offset: u6 }, }; // TODO only call create io completion port once per fd - _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined); + _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined) catch undefined; loop.beginOneEvent(); errdefer loop.finishOneEvent(); errdefer { - _ = windows.CancelIoEx(fd, &resume_node.base.overlapped); + _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); } suspend { - _ = windows.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); + _ = windows.kernel32.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); } var bytes_transferred: windows.DWORD = undefined; - if (windows.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - const err = windows.GetLastError(); - switch (err) { + if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + switch (windows.kernel32.GetLastError()) { windows.ERROR.IO_PENDING => unreachable, windows.ERROR.OPERATION_ABORTED => return error.OperationAborted, windows.ERROR.BROKEN_PIPE => return error.BrokenPipe, windows.ERROR.HANDLE_EOF => return usize(bytes_transferred), - else => return os.unexpectedErrorWindows(err), + else => |err| return windows.unexpectedError(err), } } return usize(bytes_transferred); @@ -318,10 +317,10 @@ pub async fn preadWindows(loop: *Loop, fd: os.FileHandle, data: []u8, offset: u6 /// iovecs must live until preadv promise completes pub async fn preadvPosix( loop: *Loop, - fd: os.FileHandle, - iovecs: []const posix.iovec, + fd: fd_t, + iovecs: []const os.iovec, offset: usize, -) os.PosixReadError!usize { +) os.ReadError!usize { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -362,8 +361,8 @@ pub async fn openPosix( loop: *Loop, path: []const u8, flags: u32, - mode: os.File.Mode, -) os.File.OpenError!os.FileHandle { + mode: File.Mode, +) File.OpenError!fd_t { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -402,19 +401,21 @@ pub async fn openPosix( return req_node.data.msg.Open.result; } -pub async fn openRead(loop: *Loop, path: []const u8) os.File.OpenError!os.FileHandle { +pub async fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t { switch (builtin.os) { builtin.Os.macosx, builtin.Os.linux, builtin.Os.freebsd, builtin.Os.netbsd => { - const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; - return await (async openPosix(loop, path, flags, os.File.default_mode) catch unreachable); + const flags = os.O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC; + return await (async openPosix(loop, path, flags, File.default_mode) catch unreachable); }, - builtin.Os.windows => return os.windowsOpen( + builtin.Os.windows => return windows.CreateFile( path, windows.GENERIC_READ, windows.FILE_SHARE_READ, + null, windows.OPEN_EXISTING, windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + null, ), else => @compileError("Unsupported OS"), @@ -423,27 +424,29 @@ pub async fn openRead(loop: *Loop, path: []const u8) os.File.OpenError!os.FileHa /// Creates if does not exist. Truncates the file if it exists. /// Uses the default mode. -pub async fn openWrite(loop: *Loop, path: []const u8) os.File.OpenError!os.FileHandle { - return await (async openWriteMode(loop, path, os.File.default_mode) catch unreachable); +pub async fn openWrite(loop: *Loop, path: []const u8) File.OpenError!fd_t { + return await (async openWriteMode(loop, path, File.default_mode) catch unreachable); } /// Creates if does not exist. Truncates the file if it exists. -pub async fn openWriteMode(loop: *Loop, path: []const u8, mode: os.File.Mode) os.File.OpenError!os.FileHandle { +pub async fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenError!fd_t { switch (builtin.os) { builtin.Os.macosx, builtin.Os.linux, builtin.Os.freebsd, builtin.Os.netbsd, => { - const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | posix.O_CLOEXEC | posix.O_TRUNC; - return await (async openPosix(loop, path, flags, os.File.default_mode) catch unreachable); + const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC; + return await (async openPosix(loop, path, flags, File.default_mode) catch unreachable); }, - builtin.Os.windows => return os.windowsOpen( + builtin.Os.windows => return windows.CreateFile( path, windows.GENERIC_WRITE, windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, + null, windows.CREATE_ALWAYS, windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + null, ), else => @compileError("Unsupported OS"), } @@ -453,20 +456,22 @@ pub async fn openWriteMode(loop: *Loop, path: []const u8, mode: os.File.Mode) os pub async fn openReadWrite( loop: *Loop, path: []const u8, - mode: os.File.Mode, -) os.File.OpenError!os.FileHandle { + mode: File.Mode, +) File.OpenError!fd_t { switch (builtin.os) { builtin.Os.macosx, builtin.Os.linux, builtin.Os.freebsd, builtin.Os.netbsd => { - const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; + const flags = os.O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC; return await (async openPosix(loop, path, flags, mode) catch unreachable); }, - builtin.Os.windows => return os.windowsOpen( + builtin.Os.windows => return windows.CreateFile( path, windows.GENERIC_WRITE | windows.GENERIC_READ, windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, + null, windows.OPEN_ALWAYS, windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + null, ), else => @compileError("Unsupported OS"), @@ -487,7 +492,7 @@ pub const CloseOperation = struct { builtin.Os.linux, builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => OsDataPosix, builtin.Os.windows => struct { - handle: ?os.FileHandle, + handle: ?fd_t, }, else => @compileError("Unsupported OS"), @@ -551,7 +556,7 @@ pub const CloseOperation = struct { } } - pub fn setHandle(self: *CloseOperation, handle: os.FileHandle) void { + pub fn setHandle(self: *CloseOperation, handle: fd_t) void { switch (builtin.os) { builtin.Os.linux, builtin.Os.macosx, @@ -585,7 +590,7 @@ pub const CloseOperation = struct { } } - pub fn getHandle(self: *CloseOperation) os.FileHandle { + pub fn getHandle(self: *CloseOperation) fd_t { switch (builtin.os) { builtin.Os.linux, builtin.Os.macosx, @@ -606,11 +611,11 @@ pub const CloseOperation = struct { /// contents must remain alive until writeFile completes. /// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate pub async fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void { - return await (async writeFileMode(loop, path, contents, os.File.default_mode) catch unreachable); + return await (async writeFileMode(loop, path, contents, File.default_mode) catch unreachable); } /// contents must remain alive until writeFile completes. -pub async fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void { +pub async fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void { switch (builtin.os) { builtin.Os.linux, builtin.Os.macosx, @@ -623,19 +628,21 @@ pub async fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, } async fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void { - const handle = try os.windowsOpen( + const handle = try windows.CreateFile( path, windows.GENERIC_WRITE, windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, + null, windows.CREATE_ALWAYS, windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + null, ); defer os.close(handle); try await (async pwriteWindows(loop, handle, contents, 0) catch unreachable); } -async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void { +async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); @@ -689,7 +696,7 @@ pub async fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 defer list.deinit(); while (true) { - try list.ensureCapacity(list.len + os.page_size); + try list.ensureCapacity(list.len + mem.page_size); const buf = list.items[list.len..]; const buf_array = [][]u8{buf}; const amt = try await (async preadv(loop, fd, buf_array, list.len) catch unreachable); @@ -787,7 +794,7 @@ pub fn Watch(comptime V: type) type { switch (builtin.os) { builtin.Os.linux => { - const inotify_fd = try os.linuxINotifyInit1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); + const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); errdefer os.close(inotify_fd); var result: *Self = undefined; @@ -880,7 +887,7 @@ pub fn Watch(comptime V: type) type { } async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { - const resolved_path = try os.path.resolve(self.channel.loop.allocator, [][]const u8{file_path}); + const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [][]const u8{file_path}); var resolved_path_consumed = false; defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); @@ -888,10 +895,7 @@ pub fn Watch(comptime V: type) type { var close_op_consumed = false; defer if (!close_op_consumed) close_op.finish(); - const flags = switch (builtin.os) { - builtin.Os.macosx => posix.O_SYMLINK | posix.O_EVTONLY, - else => 0, - }; + const flags = if (os.darwin.is_the_target) os.O_SYMLINK | os.O_EVTONLY else 0; const mode = 0; const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable); close_op.setHandle(fd); @@ -943,16 +947,16 @@ pub fn Watch(comptime V: type) type { while (true) { if (await (async self.channel.loop.bsdWaitKev( @intCast(usize, close_op.getHandle()), - posix.EVFILT_VNODE, - posix.NOTE_WRITE | posix.NOTE_DELETE, + os.EVFILT_VNODE, + os.NOTE_WRITE | os.NOTE_DELETE, ) catch unreachable)) |kev| { // TODO handle EV_ERROR - if (kev.fflags & posix.NOTE_DELETE != 0) { + if (kev.fflags & os.NOTE_DELETE != 0) { await (async self.channel.put(Self.Event{ .id = Event.Id.Delete, .data = value_copy, }) catch unreachable); - } else if (kev.fflags & posix.NOTE_WRITE != 0) { + } else if (kev.fflags & os.NOTE_WRITE != 0) { await (async self.channel.put(Self.Event{ .id = Event.Id.CloseWrite, .data = value_copy, @@ -961,6 +965,7 @@ pub fn Watch(comptime V: type) type { } else |err| switch (err) { error.EventNotFound => unreachable, error.ProcessNotFound => unreachable, + error.Overflow => unreachable, error.AccessDenied, error.SystemResources => |casted_err| { await (async self.channel.put(casted_err) catch unreachable); }, @@ -971,17 +976,17 @@ pub fn Watch(comptime V: type) type { async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { const value_copy = value; - const dirname = os.path.dirname(file_path) orelse "."; + const dirname = std.fs.path.dirname(file_path) orelse "."; const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); var dirname_with_null_consumed = false; defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null); - const basename = os.path.basename(file_path); + const basename = std.fs.path.basename(file_path); const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename); var basename_with_null_consumed = false; defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); - const wd = try os.linuxINotifyAddWatchC( + const wd = try os.inotify_add_watchC( self.os_data.inotify_fd, dirname_with_null.ptr, os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, @@ -1017,7 +1022,7 @@ pub fn Watch(comptime V: type) type { const value_copy = value; // TODO we might need to convert dirname and basename to canonical file paths ("short"?) - const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, os.path.dirname(file_path) orelse "."); + const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse "."); var dirname_consumed = false; defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname); @@ -1025,13 +1030,13 @@ pub fn Watch(comptime V: type) type { defer self.channel.loop.allocator.free(dirname_utf16le); // TODO https://github.com/ziglang/zig/issues/265 - const basename = os.path.basename(file_path); + const basename = std.fs.path.basename(file_path); const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename); var basename_utf16le_null_consumed = false; defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null); const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; - const dir_handle = windows.CreateFileW( + const dir_handle = try windows.CreateFileW( dirname_utf16le.ptr, windows.FILE_LIST_DIRECTORY, windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE, @@ -1040,16 +1045,8 @@ pub fn Watch(comptime V: type) type { windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED, null, ); - if (dir_handle == windows.INVALID_HANDLE_VALUE) { - const err = windows.GetLastError(); - switch (err) { - windows.ERROR.FILE_NOT_FOUND => return error.FileNotFound, - windows.ERROR.PATH_NOT_FOUND => return error.FileNotFound, - else => return os.unexpectedErrorWindows(err), - } - } var dir_handle_consumed = false; - defer if (!dir_handle_consumed) os.close(dir_handle); + defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); const held = await (async self.os_data.table_lock.acquire() catch unreachable); defer held.release(); @@ -1128,7 +1125,7 @@ pub fn Watch(comptime V: type) type { var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; // TODO handle this error not in the channel but in the setup - _ = os.windowsCreateIoCompletionPort( + _ = windows.CreateIoCompletionPort( dir_handle, self.channel.loop.os_data.io_port, undefined, @@ -1144,10 +1141,10 @@ pub fn Watch(comptime V: type) type { self.channel.loop.beginOneEvent(); errdefer self.channel.loop.finishOneEvent(); errdefer { - _ = windows.CancelIoEx(dir_handle, &resume_node.base.overlapped); + _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); } suspend { - _ = windows.ReadDirectoryChangesW( + _ = windows.kernel32.ReadDirectoryChangesW( dir_handle, &event_buf, @intCast(windows.DWORD, event_buf.len), @@ -1163,10 +1160,9 @@ pub fn Watch(comptime V: type) type { } } var bytes_transferred: windows.DWORD = undefined; - if (windows.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { - const errno = windows.GetLastError(); - const err = switch (errno) { - else => os.unexpectedErrorWindows(errno), + if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + const err = switch (windows.kernel32.GetLastError()) { + else => |err| windows.unexpectedError(err), }; await (async self.channel.put(err) catch unreachable); } else { @@ -1261,7 +1257,7 @@ pub fn Watch(comptime V: type) type { ev = @ptrCast(*os.linux.inotify_event, ptr); if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); - const basename_with_null = basename_ptr[0 .. std.cstr.len(basename_ptr) + 1]; + const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1]; const user_value = blk: { const held = await (async watch.os_data.table_lock.acquire() catch unreachable); defer held.release(); @@ -1340,7 +1336,7 @@ async fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void { } async fn testFsWatch(loop: *Loop) !void { - const file_path = try os.path.join(loop.allocator, [][]const u8{ test_tmp_dir, "file.txt" }); + const file_path = try std.fs.path.join(loop.allocator, [][]const u8{ test_tmp_dir, "file.txt" }); defer loop.allocator.free(file_path); const contents = @@ -1366,7 +1362,7 @@ async fn testFsWatch(loop: *Loop) !void { defer if (!ev_consumed) cancel ev; // overwrite line 2 - const fd = try await try async openReadWrite(loop, file_path, os.File.default_mode); + const fd = try await try async openReadWrite(loop, file_path, File.default_mode); { defer os.close(fd); @@ -1388,15 +1384,15 @@ async fn testFsWatch(loop: *Loop) !void { } pub const OutStream = struct { - fd: os.FileHandle, + fd: fd_t, stream: Stream, loop: *Loop, offset: usize, - pub const Error = os.File.WriteError; + pub const Error = File.WriteError; pub const Stream = event.io.OutStream(Error); - pub fn init(loop: *Loop, fd: os.FileHandle, offset: usize) OutStream { + pub fn init(loop: *Loop, fd: fd_t, offset: usize) OutStream { return OutStream{ .fd = fd, .loop = loop, @@ -1414,7 +1410,7 @@ pub const OutStream = struct { }; pub const InStream = struct { - fd: os.FileHandle, + fd: fd_t, stream: Stream, loop: *Loop, offset: usize, @@ -1422,7 +1418,7 @@ pub const InStream = struct { pub const Error = PReadVError; // TODO make this not have OutOfMemory pub const Stream = event.io.InStream(Error); - pub fn init(loop: *Loop, fd: os.FileHandle, offset: usize) InStream { + pub fn init(loop: *Loop, fd: fd_t, offset: usize) InStream { return InStream{ .fd = fd, .loop = loop, diff --git a/std/event/group.zig b/std/event/group.zig index 455d1bd60c..143efd76c3 100644 --- a/std/event/group.zig +++ b/std/event/group.zig @@ -155,7 +155,7 @@ async fn testGroup(loop: *Loop) void { } async fn sleepALittle(count: *usize) void { - std.os.time.sleep(1 * std.os.time.millisecond); + std.time.sleep(1 * std.time.millisecond); _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); } diff --git a/std/event/loop.zig b/std/event/loop.zig index 76b1f6455b..61732d78f5 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -7,9 +7,9 @@ const AtomicRmwOp = builtin.AtomicRmwOp; const AtomicOrder = builtin.AtomicOrder; const fs = std.event.fs; const os = std.os; -const posix = os.posix; const windows = os.windows; const maxInt = std.math.maxInt; +const Thread = std.Thread; pub const Loop = struct { allocator: *mem.Allocator, @@ -17,7 +17,7 @@ pub const Loop = struct { os_data: OsData, final_resume_node: ResumeNode, pending_event_count: usize, - extra_threads: []*os.Thread, + extra_threads: []*Thread, // pre-allocated eventfds. all permanently active. // this is how we send promises to be resumed on other threads. @@ -32,7 +32,7 @@ pub const Loop = struct { overlapped: Overlapped, pub const overlapped_init = switch (builtin.os) { - builtin.Os.windows => windows.OVERLAPPED{ + .windows => windows.OVERLAPPED{ .Internal = 0, .InternalHigh = 0, .Offset = 0, @@ -50,13 +50,13 @@ pub const Loop = struct { }; pub const EventFd = switch (builtin.os) { - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventFd, - builtin.Os.linux => struct { + .macosx, .freebsd, .netbsd => KEventFd, + .linux => struct { base: ResumeNode, epoll_op: u32, eventfd: i32, }, - builtin.Os.windows => struct { + .windows => struct { base: ResumeNode, completion_key: usize, }, @@ -65,15 +65,15 @@ pub const Loop = struct { const KEventFd = struct { base: ResumeNode, - kevent: posix.Kevent, + kevent: os.Kevent, }; pub const Basic = switch (builtin.os) { - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventBasic, - builtin.Os.linux => struct { + .macosx, .freebsd, .netbsd => KEventBasic, + .linux => struct { base: ResumeNode, }, - builtin.Os.windows => struct { + .windows => struct { base: ResumeNode, }, else => @compileError("unsupported OS"), @@ -81,7 +81,7 @@ pub const Loop = struct { const KEventBasic = struct { base: ResumeNode, - kev: posix.Kevent, + kev: os.Kevent, }; }; @@ -99,7 +99,7 @@ pub const Loop = struct { /// have the correct pointer value. pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode"); - const core_count = try os.cpuCount(allocator); + const core_count = try Thread.cpuCount(); return self.initInternal(allocator, core_count); } @@ -127,7 +127,7 @@ pub const Loop = struct { ); errdefer self.allocator.free(self.eventfd_resume_nodes); - self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count); + self.extra_threads = try self.allocator.alloc(*Thread, extra_thread_count); errdefer self.allocator.free(self.extra_threads); try self.initOsData(extra_thread_count); @@ -139,15 +139,15 @@ pub const Loop = struct { self.allocator.free(self.extra_threads); } - const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError || - os.SpawnThreadError || os.LinuxEpollCtlError || os.BsdKEventError || - os.WindowsCreateIoCompletionPortError; + const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError || + Thread.SpawnError || os.EpollCtlError || os.KEventError || + windows.CreateIoCompletionPortError; const wakeup_bytes = []u8{0x1} ** 8; fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { switch (builtin.os) { - builtin.Os.linux => { + .linux => { self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); self.os_data.fs_queue_item = 0; // we need another thread for the file system because Linux does not have an async @@ -172,32 +172,32 @@ pub const Loop = struct { .handle = undefined, .overlapped = ResumeNode.overlapped_init, }, - .eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), - .epoll_op = posix.EPOLL_CTL_ADD, + .eventfd = try os.eventfd(1, os.EFD_CLOEXEC | os.EFD_NONBLOCK), + .epoll_op = os.EPOLL_CTL_ADD, }, .next = undefined, }; self.available_eventfd_resume_nodes.push(eventfd_node); } - self.os_data.epollfd = try os.linuxEpollCreate(posix.EPOLL_CLOEXEC); + self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC); errdefer os.close(self.os_data.epollfd); - self.os_data.final_eventfd = try os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); + self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK); errdefer os.close(self.os_data.final_eventfd); - self.os_data.final_eventfd_event = posix.epoll_event{ - .events = posix.EPOLLIN, - .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, + self.os_data.final_eventfd_event = os.epoll_event{ + .events = os.EPOLLIN, + .data = os.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, }; - try os.linuxEpollCtl( + try os.epoll_ctl( self.os_data.epollfd, - posix.EPOLL_CTL_ADD, + os.EPOLL_CTL_ADD, self.os_data.final_eventfd, &self.os_data.final_eventfd_event, ); - self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); errdefer { self.posixFsRequest(&self.os_data.fs_end_request); self.os_data.fs_thread.wait(); @@ -211,21 +211,21 @@ pub const Loop = struct { var extra_thread_index: usize = 0; errdefer { // writing 8 bytes to an eventfd cannot fail - os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); } }, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - self.os_data.kqfd = try os.bsdKQueue(); + .macosx, .freebsd, .netbsd => { + self.os_data.kqfd = try os.kqueue(); errdefer os.close(self.os_data.kqfd); - self.os_data.fs_kqfd = try os.bsdKQueue(); + self.os_data.fs_kqfd = try os.kqueue(); errdefer os.close(self.os_data.fs_kqfd); self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); @@ -240,7 +240,7 @@ pub const Loop = struct { }, }; - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; for (self.eventfd_resume_nodes) |*eventfd_node, i| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -251,10 +251,10 @@ pub const Loop = struct { .overlapped = ResumeNode.overlapped_init, }, // this one is for sending events - .kevent = posix.Kevent{ + .kevent = os.Kevent{ .ident = i, - .filter = posix.EVFILT_USER, - .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE, + .filter = os.EVFILT_USER, + .flags = os.EV_CLEAR | os.EV_ADD | os.EV_DISABLE, .fflags = 0, .data = 0, .udata = @ptrToInt(&eventfd_node.data.base), @@ -263,46 +263,46 @@ pub const Loop = struct { .next = undefined, }; self.available_eventfd_resume_nodes.push(eventfd_node); - const kevent_array = (*const [1]posix.Kevent)(&eventfd_node.data.kevent); - _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); - eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; - eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; + const kevent_array = (*const [1]os.Kevent)(&eventfd_node.data.kevent); + _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null); + eventfd_node.data.kevent.flags = os.EV_CLEAR | os.EV_ENABLE; + eventfd_node.data.kevent.fflags = os.NOTE_TRIGGER; } // Pre-add so that we cannot get error.SystemResources // later when we try to activate it. - self.os_data.final_kevent = posix.Kevent{ + self.os_data.final_kevent = os.Kevent{ .ident = extra_thread_count, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD | posix.EV_DISABLE, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_DISABLE, .fflags = 0, .data = 0, .udata = @ptrToInt(&self.final_resume_node), }; - const final_kev_arr = (*const [1]posix.Kevent)(&self.os_data.final_kevent); - _ = try os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); - self.os_data.final_kevent.flags = posix.EV_ENABLE; - self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; + const final_kev_arr = (*const [1]os.Kevent)(&self.os_data.final_kevent); + _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); + self.os_data.final_kevent.flags = os.EV_ENABLE; + self.os_data.final_kevent.fflags = os.NOTE_TRIGGER; - self.os_data.fs_kevent_wake = posix.Kevent{ + self.os_data.fs_kevent_wake = os.Kevent{ .ident = 0, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD | posix.EV_ENABLE, - .fflags = posix.NOTE_TRIGGER, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_ENABLE, + .fflags = os.NOTE_TRIGGER, .data = 0, .udata = undefined, }; - self.os_data.fs_kevent_wait = posix.Kevent{ + self.os_data.fs_kevent_wait = os.Kevent{ .ident = 0, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD | posix.EV_CLEAR, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_CLEAR, .fflags = 0, .data = 0, .udata = undefined, }; - self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); errdefer { self.posixFsRequest(&self.os_data.fs_end_request); self.os_data.fs_thread.wait(); @@ -315,24 +315,24 @@ pub const Loop = struct { var extra_thread_index: usize = 0; errdefer { - _ = os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; + _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); } }, - builtin.Os.windows => { - self.os_data.io_port = try os.windowsCreateIoCompletionPort( + .windows => { + self.os_data.io_port = try windows.CreateIoCompletionPort( windows.INVALID_HANDLE_VALUE, null, undefined, maxInt(windows.DWORD), ); - errdefer os.close(self.os_data.io_port); + errdefer windows.CloseHandle(self.os_data.io_port); for (self.eventfd_resume_nodes) |*eventfd_node, i| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -361,7 +361,7 @@ pub const Loop = struct { while (i < extra_thread_index) : (i += 1) { while (true) { const overlapped = &self.final_resume_node.overlapped; - os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; + windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; break; } } @@ -371,7 +371,7 @@ pub const Loop = struct { } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); } }, else => {}, @@ -380,18 +380,18 @@ pub const Loop = struct { fn deinitOsData(self: *Loop) void { switch (builtin.os) { - builtin.Os.linux => { + .linux => { os.close(self.os_data.final_eventfd); while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); os.close(self.os_data.epollfd); self.allocator.free(self.eventfd_resume_nodes); }, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { + .macosx, .freebsd, .netbsd => { os.close(self.os_data.kqfd); os.close(self.os_data.fs_kqfd); }, - builtin.Os.windows => { - os.close(self.os_data.io_port); + .windows => { + windows.CloseHandle(self.os_data.io_port); }, else => {}, } @@ -400,28 +400,28 @@ pub const Loop = struct { /// resume_node must live longer than the promise that it holds a reference to. /// flags must contain EPOLLET pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void { - assert(flags & posix.EPOLLET == posix.EPOLLET); + assert(flags & os.EPOLLET == os.EPOLLET); self.beginOneEvent(); errdefer self.finishOneEvent(); try self.linuxModFd( fd, - posix.EPOLL_CTL_ADD, + os.EPOLL_CTL_ADD, flags, resume_node, ); } pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void { - assert(flags & posix.EPOLLET == posix.EPOLLET); + assert(flags & os.EPOLLET == os.EPOLLET); var ev = os.linux.epoll_event{ .events = flags, .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, }; - try os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev); + try os.epoll_ctl(self.os_data.epollfd, op, fd, &ev); } pub fn linuxRemoveFd(self: *Loop, fd: i32) void { - os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; + os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; self.finishOneEvent(); } @@ -440,7 +440,7 @@ pub const Loop = struct { } } - pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !posix.Kevent { + pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent { // TODO #1194 suspend { resume @handle(); @@ -464,31 +464,31 @@ pub const Loop = struct { pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode.Basic, ident: usize, filter: i16, fflags: u32) !void { self.beginOneEvent(); errdefer self.finishOneEvent(); - var kev = posix.Kevent{ + var kev = os.Kevent{ .ident = ident, .filter = filter, - .flags = posix.EV_ADD | posix.EV_ENABLE | posix.EV_CLEAR, + .flags = os.EV_ADD | os.EV_ENABLE | os.EV_CLEAR, .fflags = fflags, .data = 0, .udata = @ptrToInt(&resume_node.base), }; - const kevent_array = (*const [1]posix.Kevent)(&kev); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); + const kevent_array = (*const [1]os.Kevent)(&kev); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null); } pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void { - var kev = posix.Kevent{ + var kev = os.Kevent{ .ident = ident, .filter = filter, - .flags = posix.EV_DELETE, + .flags = os.EV_DELETE, .fflags = 0, .data = 0, .udata = 0, }; - const kevent_array = (*const [1]posix.Kevent)(&kev); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch undefined; + const kevent_array = (*const [1]os.Kevent)(&kev); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch undefined; self.finishOneEvent(); } @@ -501,18 +501,18 @@ pub const Loop = struct { const eventfd_node = &resume_stack_node.data; eventfd_node.base.handle = next_tick_node.data; switch (builtin.os) { - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - const kevent_array = (*const [1]posix.Kevent)(&eventfd_node.kevent); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { + .macosx, .freebsd, .netbsd => { + const kevent_array = (*const [1]os.Kevent)(&eventfd_node.kevent); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { self.next_tick_queue.unget(next_tick_node); self.available_eventfd_resume_nodes.push(resume_stack_node); return; }; }, - builtin.Os.linux => { + .linux => { // the pending count is already accounted for - const epoll_events = posix.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | + const epoll_events = os.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | os.linux.EPOLLET; self.linuxModFd( eventfd_node.eventfd, @@ -525,8 +525,8 @@ pub const Loop = struct { return; }; }, - builtin.Os.windows => { - os.windowsPostQueuedCompletionStatus( + .windows => { + windows.PostQueuedCompletionStatus( self.os_data.io_port, undefined, undefined, @@ -623,26 +623,26 @@ pub const Loop = struct { if (prev == 1) { // cause all the threads to stop switch (builtin.os) { - builtin.Os.linux => { + .linux => { self.posixFsRequest(&self.os_data.fs_end_request); // writing 8 bytes to an eventfd cannot fail - os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; return; }, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { + .macosx, .freebsd, .netbsd => { self.posixFsRequest(&self.os_data.fs_end_request); - const final_kevent = (*const [1]posix.Kevent)(&self.os_data.final_kevent); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + const final_kevent = (*const [1]os.Kevent)(&self.os_data.final_kevent); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; // cannot fail because we already added it and this just enables it - _ = os.bsdKEvent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; + _ = os.kevent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; return; }, - builtin.Os.windows => { + .windows => { var i: usize = 0; while (i < self.extra_threads.len + 1) : (i += 1) { while (true) { const overlapped = &self.final_resume_node.overlapped; - os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; + windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; break; } } @@ -663,10 +663,10 @@ pub const Loop = struct { } switch (builtin.os) { - builtin.Os.linux => { + .linux => { // only process 1 event so we don't steal from other threads var events: [1]os.linux.epoll_event = undefined; - const count = os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); + const count = os.epoll_wait(self.os_data.epollfd, events[0..], -1); for (events[0..count]) |ev| { const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); const handle = resume_node.handle; @@ -676,7 +676,7 @@ pub const Loop = struct { ResumeNode.Id.Stop => return, ResumeNode.Id.EventFd => { const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); - event_fd_node.epoll_op = posix.EPOLL_CTL_MOD; + event_fd_node.epoll_op = os.EPOLL_CTL_MOD; const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); self.available_eventfd_resume_nodes.push(stack_node); }, @@ -687,10 +687,10 @@ pub const Loop = struct { } } }, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - var eventlist: [1]posix.Kevent = undefined; - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - const count = os.bsdKEvent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable; + .macosx, .freebsd, .netbsd => { + var eventlist: [1]os.Kevent = undefined; + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable; for (eventlist[0..count]) |ev| { const resume_node = @intToPtr(*ResumeNode, ev.udata); const handle = resume_node.handle; @@ -713,16 +713,16 @@ pub const Loop = struct { } } }, - builtin.Os.windows => { + .windows => { var completion_key: usize = undefined; const overlapped = while (true) { var nbytes: windows.DWORD = undefined; var overlapped: ?*windows.OVERLAPPED = undefined; - switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { - os.WindowsWaitResult.Aborted => return, - os.WindowsWaitResult.Normal => {}, - os.WindowsWaitResult.EOF => {}, - os.WindowsWaitResult.Cancelled => continue, + switch (windows.GetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { + .Aborted => return, + .Normal => {}, + .EOF => {}, + .Cancelled => continue, } if (overlapped) |o| break o; } else unreachable; // TODO else unreachable should not be necessary @@ -751,16 +751,16 @@ pub const Loop = struct { self.os_data.fs_queue.put(request_node); switch (builtin.os) { builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - const fs_kevs = (*const [1]posix.Kevent)(&self.os_data.fs_kevent_wake); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; + const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wake); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; }, builtin.Os.linux => { _ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); const rc = os.linux.futex_wake(&self.os_data.fs_queue_item, os.linux.FUTEX_WAKE, 1); switch (os.linux.getErrno(rc)) { 0 => {}, - posix.EINVAL => unreachable, + os.EINVAL => unreachable, else => unreachable, } }, @@ -783,24 +783,24 @@ pub const Loop = struct { switch (node.data.msg) { @TagType(fs.Request.Msg).End => return, @TagType(fs.Request.Msg).PWriteV => |*msg| { - msg.result = os.posix_pwritev(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + msg.result = os.pwritev(msg.fd, msg.iov, msg.offset); }, @TagType(fs.Request.Msg).PReadV => |*msg| { - msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + msg.result = os.preadv(msg.fd, msg.iov, msg.offset); }, @TagType(fs.Request.Msg).Open => |*msg| { - msg.result = os.posixOpenC(msg.path.ptr, msg.flags, msg.mode); + msg.result = os.openC(msg.path.ptr, msg.flags, msg.mode); }, @TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd), @TagType(fs.Request.Msg).WriteFile => |*msg| blk: { - const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | - posix.O_CLOEXEC | posix.O_TRUNC; - const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| { + const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | + os.O_CLOEXEC | os.O_TRUNC; + const fd = os.openC(msg.path.ptr, flags, msg.mode) catch |err| { msg.result = err; break :blk; }; defer os.close(fd); - msg.result = os.posixWrite(fd, msg.contents); + msg.result = os.write(fd, msg.contents); }, } switch (node.data.finish) { @@ -816,14 +816,14 @@ pub const Loop = struct { builtin.Os.linux => { const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null); switch (os.linux.getErrno(rc)) { - 0, posix.EINTR, posix.EAGAIN => continue, + 0, os.EINTR, os.EAGAIN => continue, else => unreachable, } }, builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - const fs_kevs = (*const [1]posix.Kevent)(&self.os_data.fs_kevent_wait); - var out_kevs: [1]posix.Kevent = undefined; - _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; + const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wait); + var out_kevs: [1]os.Kevent = undefined; + _ = os.kevent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; }, else => @compileError("Unsupported OS"), } @@ -831,9 +831,9 @@ pub const Loop = struct { } const OsData = switch (builtin.os) { - builtin.Os.linux => LinuxOsData, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventData, - builtin.Os.windows => struct { + .linux => LinuxOsData, + .macosx, .freebsd, .netbsd => KEventData, + .windows => struct { io_port: windows.HANDLE, extra_thread_count: usize, }, @@ -842,10 +842,10 @@ pub const Loop = struct { const KEventData = struct { kqfd: i32, - final_kevent: posix.Kevent, - fs_kevent_wake: posix.Kevent, - fs_kevent_wait: posix.Kevent, - fs_thread: *os.Thread, + final_kevent: os.Kevent, + fs_kevent_wake: os.Kevent, + fs_kevent_wait: os.Kevent, + fs_thread: *Thread, fs_kqfd: i32, fs_queue: std.atomic.Queue(fs.Request), fs_end_request: fs.RequestNode, @@ -855,7 +855,7 @@ pub const Loop = struct { epollfd: i32, final_eventfd: i32, final_eventfd_event: os.linux.epoll_event, - fs_thread: *os.Thread, + fs_thread: *Thread, fs_queue_item: i32, fs_queue: std.atomic.Queue(fs.Request), fs_end_request: fs.RequestNode, diff --git a/std/event/net.zig b/std/event/net.zig index 687c119920..f4398196e3 100644 --- a/std/event/net.zig +++ b/std/event/net.zig @@ -4,11 +4,12 @@ const testing = std.testing; const event = std.event; const mem = std.mem; const os = std.os; -const posix = os.posix; const Loop = std.event.Loop; +const File = std.fs.File; +const fd_t = os.fd_t; pub const Server = struct { - handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, os.File) void, + handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, File) void, loop: *Loop, sockfd: ?i32, @@ -40,30 +41,33 @@ pub const Server = struct { pub fn listen( self: *Server, address: *const std.net.Address, - handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, os.File) void, + handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, File) void, ) !void { self.handleRequestFn = handleRequestFn; - const sockfd = try os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); + const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); errdefer os.close(sockfd); self.sockfd = sockfd; - try os.posixBind(sockfd, &address.os_addr); - try os.posixListen(sockfd, posix.SOMAXCONN); - self.listen_address = std.net.Address.initPosix(try os.posixGetSockName(sockfd)); + try os.bind(sockfd, &address.os_addr); + try os.listen(sockfd, os.SOMAXCONN); + self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd)); self.accept_coro = try async<self.loop.allocator> Server.handler(self); errdefer cancel self.accept_coro.?; self.listen_resume_node.handle = self.accept_coro.?; - try self.loop.linuxAddFd(sockfd, &self.listen_resume_node, posix.EPOLLIN | posix.EPOLLOUT | posix.EPOLLET); + try self.loop.linuxAddFd(sockfd, &self.listen_resume_node, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); errdefer self.loop.removeFd(sockfd); } /// Stop listening pub fn close(self: *Server) void { self.loop.linuxRemoveFd(self.sockfd.?); - os.close(self.sockfd.?); + if (self.sockfd) |fd| { + os.close(fd); + self.sockfd = null; + } } pub fn deinit(self: *Server) void { @@ -75,13 +79,13 @@ pub const Server = struct { while (true) { var accepted_addr: std.net.Address = undefined; // TODO just inline the following function here and don't expose it as posixAsyncAccept - if (os.posixAsyncAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { + if (os.accept4_async(self.sockfd.?, &accepted_addr.os_addr, os.SOCK_NONBLOCK | os.SOCK_CLOEXEC)) |accepted_fd| { if (accepted_fd == -1) { // would block suspend; // we will get resumed by epoll_wait in the event loop continue; } - var socket = os.File.openHandle(accepted_fd); + var socket = File.openHandle(accepted_fd); _ = async<self.loop.allocator> self.handleRequestFn(self, &accepted_addr, socket) catch |err| switch (err) { error.OutOfMemory => { socket.close(); @@ -89,14 +93,7 @@ pub const Server = struct { }, }; } else |err| switch (err) { - error.ProcessFdQuotaExceeded => { - errdefer os.emfile_promise_queue.remove(&self.waiting_for_emfile_node); - suspend { - self.waiting_for_emfile_node = PromiseNode.init(@handle()); - os.emfile_promise_queue.append(&self.waiting_for_emfile_node); - } - continue; - }, + error.ProcessFdQuotaExceeded => @panic("TODO handle this error"), error.ConnectionAborted => continue, error.FileDescriptorNotASocket => unreachable, @@ -111,24 +108,24 @@ pub const Server = struct { }; pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 { - const sockfd = try os.posixSocket( - posix.AF_UNIX, - posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, + const sockfd = try os.socket( + os.AF_UNIX, + os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, 0, ); errdefer os.close(sockfd); - var sock_addr = posix.sockaddr_un{ - .family = posix.AF_UNIX, + var sock_addr = os.sockaddr_un{ + .family = os.AF_UNIX, .path = undefined, }; if (path.len > @typeOf(sock_addr.path).len) return error.NameTooLong; mem.copy(u8, sock_addr.path[0..], path); - const size = @intCast(u32, @sizeOf(posix.sa_family_t) + path.len); - try os.posixConnectAsync(sockfd, &sock_addr, size); - try await try async loop.linuxWaitFd(sockfd, posix.EPOLLIN | posix.EPOLLOUT | posix.EPOLLET); - try os.posixGetSockOptConnectError(sockfd); + const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len); + try os.connect_async(sockfd, &sock_addr, size); + try await try async loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); + try os.getsockoptError(sockfd); return sockfd; } @@ -146,51 +143,49 @@ pub const ReadError = error{ }; /// returns number of bytes read. 0 means EOF. -pub async fn read(loop: *std.event.Loop, fd: os.FileHandle, buffer: []u8) ReadError!usize { - const iov = posix.iovec{ +pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize { + const iov = os.iovec{ .iov_base = buffer.ptr, .iov_len = buffer.len, }; - const iovs: *const [1]posix.iovec = &iov; + const iovs: *const [1]os.iovec = &iov; return await (async readvPosix(loop, fd, iovs, 1) catch unreachable); } pub const WriteError = error{}; -pub async fn write(loop: *std.event.Loop, fd: os.FileHandle, buffer: []const u8) WriteError!void { - const iov = posix.iovec_const{ +pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteError!void { + const iov = os.iovec_const{ .iov_base = buffer.ptr, .iov_len = buffer.len, }; - const iovs: *const [1]posix.iovec_const = &iov; + const iovs: *const [1]os.iovec_const = &iov; return await (async writevPosix(loop, fd, iovs, 1) catch unreachable); } -pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const posix.iovec_const, count: usize) !void { +pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void { while (true) { switch (builtin.os) { - builtin.Os.macosx, builtin.Os.linux => { - const rc = posix.writev(fd, iov, count); - const err = posix.getErrno(rc); - switch (err) { + .macosx, .linux => { + switch (os.errno(os.system.writev(fd, iov, count))) { 0 => return, - posix.EINTR => continue, - posix.ESPIPE => unreachable, - posix.EINVAL => unreachable, - posix.EFAULT => unreachable, - posix.EAGAIN => { - try await (async loop.linuxWaitFd(fd, posix.EPOLLET | posix.EPOLLOUT) catch unreachable); + os.EINTR => continue, + os.ESPIPE => unreachable, + os.EINVAL => unreachable, + os.EFAULT => unreachable, + os.EAGAIN => { + try await (async loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT) catch unreachable); continue; }, - posix.EBADF => unreachable, // always a race condition - posix.EDESTADDRREQ => unreachable, // connect was never called - posix.EDQUOT => unreachable, - posix.EFBIG => unreachable, - posix.EIO => return error.InputOutput, - posix.ENOSPC => unreachable, - posix.EPERM => return error.AccessDenied, - posix.EPIPE => unreachable, - else => return os.unexpectedErrorPosix(err), + os.EBADF => unreachable, // always a race condition + os.EDESTADDRREQ => unreachable, // connect was never called + os.EDQUOT => unreachable, + os.EFBIG => unreachable, + os.EIO => return error.InputOutput, + os.ENOSPC => unreachable, + os.EPERM => return error.AccessDenied, + os.EPIPE => unreachable, + else => |err| return os.unexpectedErrno(err), } }, else => @compileError("Unsupported OS"), @@ -199,27 +194,26 @@ pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const posix.iovec_const, } /// returns number of bytes read. 0 means EOF. -pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]posix.iovec, count: usize) !usize { +pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count: usize) !usize { while (true) { switch (builtin.os) { builtin.Os.linux, builtin.Os.freebsd, builtin.Os.macosx => { - const rc = posix.readv(fd, iov, count); - const err = posix.getErrno(rc); - switch (err) { + const rc = os.system.readv(fd, iov, count); + switch (os.errno(rc)) { 0 => return rc, - posix.EINTR => continue, - posix.EINVAL => unreachable, - posix.EFAULT => unreachable, - posix.EAGAIN => { - try await (async loop.linuxWaitFd(fd, posix.EPOLLET | posix.EPOLLIN) catch unreachable); + os.EINTR => continue, + os.EINVAL => unreachable, + os.EFAULT => unreachable, + os.EAGAIN => { + try await (async loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN) catch unreachable); continue; }, - posix.EBADF => unreachable, // always a race condition - posix.EIO => return error.InputOutput, - posix.EISDIR => unreachable, - posix.ENOBUFS => return error.SystemResources, - posix.ENOMEM => return error.SystemResources, - else => return os.unexpectedErrorPosix(err), + os.EBADF => unreachable, // always a race condition + os.EIO => return error.InputOutput, + os.EISDIR => unreachable, + os.ENOBUFS => return error.SystemResources, + os.ENOMEM => return error.SystemResources, + else => |err| return os.unexpectedErrno(err), } }, else => @compileError("Unsupported OS"), @@ -227,12 +221,12 @@ pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]posix.iovec, cou } } -pub async fn writev(loop: *Loop, fd: os.FileHandle, data: []const []const u8) !void { - const iovecs = try loop.allocator.alloc(os.posix.iovec_const, data.len); +pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void { + const iovecs = try loop.allocator.alloc(os.iovec_const, data.len); defer loop.allocator.free(iovecs); for (data) |buf, i| { - iovecs[i] = os.posix.iovec_const{ + iovecs[i] = os.iovec_const{ .iov_base = buf.ptr, .iov_len = buf.len, }; @@ -241,12 +235,12 @@ pub async fn writev(loop: *Loop, fd: os.FileHandle, data: []const []const u8) !v return await (async writevPosix(loop, fd, iovecs.ptr, data.len) catch unreachable); } -pub async fn readv(loop: *Loop, fd: os.FileHandle, data: []const []u8) !usize { - const iovecs = try loop.allocator.alloc(os.posix.iovec, data.len); +pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize { + const iovecs = try loop.allocator.alloc(os.iovec, data.len); defer loop.allocator.free(iovecs); for (data) |buf, i| { - iovecs[i] = os.posix.iovec{ + iovecs[i] = os.iovec{ .iov_base = buf.ptr, .iov_len = buf.len, }; @@ -255,17 +249,17 @@ pub async fn readv(loop: *Loop, fd: os.FileHandle, data: []const []u8) !usize { return await (async readvPosix(loop, fd, iovecs.ptr, data.len) catch unreachable); } -pub async fn connect(loop: *Loop, _address: *const std.net.Address) !os.File { +pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File { var address = _address.*; // TODO https://github.com/ziglang/zig/issues/1592 - const sockfd = try os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); + const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); errdefer os.close(sockfd); - try os.posixConnectAsync(sockfd, &address.os_addr, @sizeOf(posix.sockaddr_in)); - try await try async loop.linuxWaitFd(sockfd, posix.EPOLLIN | posix.EPOLLOUT | posix.EPOLLET); - try os.posixGetSockOptConnectError(sockfd); + try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in)); + try await try async loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); + try os.getsockoptError(sockfd); - return os.File.openHandle(sockfd); + return File.openHandle(sockfd); } test "listen on a port, send bytes, receive bytes" { @@ -281,7 +275,7 @@ test "listen on a port, send bytes, receive bytes" { tcp_server: Server, const Self = @This(); - async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: os.File) void { + async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void { const self = @fieldParentPtr(Self, "tcp_server", tcp_server); var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 defer socket.close(); @@ -294,7 +288,7 @@ test "listen on a port, send bytes, receive bytes" { cancel @handle(); } } - async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: os.File) !void { + async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void { const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/1592 var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 @@ -331,14 +325,14 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Serv } pub const OutStream = struct { - fd: os.FileHandle, + fd: fd_t, stream: Stream, loop: *Loop, pub const Error = WriteError; pub const Stream = event.io.OutStream(Error); - pub fn init(loop: *Loop, fd: os.FileHandle) OutStream { + pub fn init(loop: *Loop, fd: fd_t) OutStream { return OutStream{ .fd = fd, .loop = loop, @@ -353,14 +347,14 @@ pub const OutStream = struct { }; pub const InStream = struct { - fd: os.FileHandle, + fd: fd_t, stream: Stream, loop: *Loop, pub const Error = ReadError; pub const Stream = event.io.InStream(Error); - pub fn init(loop: *Loop, fd: os.FileHandle) InStream { + pub fn init(loop: *Loop, fd: fd_t) InStream { return InStream{ .fd = fd, .loop = loop, diff --git a/std/event/rwlock.zig b/std/event/rwlock.zig index 76b364fedc..00f3c0bc60 100644 --- a/std/event/rwlock.zig +++ b/std/event/rwlock.zig @@ -271,7 +271,7 @@ async fn writeRunner(lock: *RwLock) void { var i: usize = 0; while (i < shared_test_data.len) : (i += 1) { - std.os.time.sleep(100 * std.os.time.microsecond); + std.time.sleep(100 * std.time.microsecond); const lock_promise = async lock.acquireWrite() catch @panic("out of memory"); const handle = await lock_promise; defer handle.release(); @@ -286,7 +286,7 @@ async fn writeRunner(lock: *RwLock) void { async fn readRunner(lock: *RwLock) void { suspend; // resumed by onNextTick - std.os.time.sleep(1); + std.time.sleep(1); var i: usize = 0; while (i < shared_test_data.len) : (i += 1) { |
