aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event/fs.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2019-09-26 01:54:45 -0400
committerGitHub <noreply@github.com>2019-09-26 01:54:45 -0400
commit68bb3945708c43109c48bda3664176307d45b62c (patch)
treeafb9731e10cef9d192560b52cd9ae2cf179775c4 /lib/std/event/fs.zig
parent6128bc728d1e1024a178c16c2149f5b1a167a013 (diff)
parent4637e8f9699af9c3c6cf4df50ef5bb67c7a318a4 (diff)
downloadzig-68bb3945708c43109c48bda3664176307d45b62c.tar.gz
zig-68bb3945708c43109c48bda3664176307d45b62c.zip
Merge pull request #3315 from ziglang/mv-std-lib
Move std/ to lib/std/
Diffstat (limited to 'lib/std/event/fs.zig')
-rw-r--r--lib/std/event/fs.zig1431
1 files changed, 1431 insertions, 0 deletions
diff --git a/lib/std/event/fs.zig b/lib/std/event/fs.zig
new file mode 100644
index 0000000000..4490e1deae
--- /dev/null
+++ b/lib/std/event/fs.zig
@@ -0,0 +1,1431 @@
+const builtin = @import("builtin");
+const std = @import("../std.zig");
+const event = std.event;
+const assert = std.debug.assert;
+const testing = std.testing;
+const os = std.os;
+const mem = std.mem;
+const windows = os.windows;
+const Loop = event.Loop;
+const fd_t = os.fd_t;
+const File = std.fs.File;
+
+pub const RequestNode = std.atomic.Queue(Request).Node;
+
+pub const Request = struct {
+ msg: Msg,
+ finish: Finish,
+
+ pub const Finish = union(enum) {
+ TickNode: Loop.NextTickNode,
+ DeallocCloseOperation: *CloseOperation,
+ NoAction,
+ };
+
+ pub const Msg = union(enum) {
+ WriteV: WriteV,
+ PWriteV: PWriteV,
+ PReadV: PReadV,
+ Open: Open,
+ Close: Close,
+ WriteFile: WriteFile,
+ End, // special - means the fs thread should exit
+
+ pub const WriteV = struct {
+ fd: fd_t,
+ iov: []const os.iovec_const,
+ result: Error!void,
+
+ pub const Error = os.WriteError;
+ };
+
+ pub const PWriteV = struct {
+ fd: fd_t,
+ iov: []const os.iovec_const,
+ offset: usize,
+ result: Error!void,
+
+ pub const Error = os.WriteError;
+ };
+
+ pub const PReadV = struct {
+ fd: fd_t,
+ iov: []const os.iovec,
+ offset: usize,
+ result: Error!usize,
+
+ pub const Error = os.ReadError;
+ };
+
+ pub const Open = struct {
+ /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265
+ path: []const u8,
+ flags: u32,
+ mode: File.Mode,
+ result: Error!fd_t,
+
+ pub const Error = File.OpenError;
+ };
+
+ pub const WriteFile = struct {
+ /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265
+ path: []const u8,
+ contents: []const u8,
+ mode: File.Mode,
+ result: Error!void,
+
+ pub const Error = File.OpenError || File.WriteError;
+ };
+
+ pub const Close = struct {
+ fd: fd_t,
+ };
+ };
+};
+
+pub const PWriteVError = error{OutOfMemory} || File.WriteError;
+
+/// data - just the inner references - must live until pwritev frame completes.
+pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void {
+ switch (builtin.os) {
+ .macosx,
+ .linux,
+ .freebsd,
+ .netbsd,
+ => {
+ const iovecs = try loop.allocator.alloc(os.iovec_const, data.len);
+ defer loop.allocator.free(iovecs);
+
+ for (data) |buf, i| {
+ iovecs[i] = os.iovec_const{
+ .iov_base = buf.ptr,
+ .iov_len = buf.len,
+ };
+ }
+
+ return pwritevPosix(loop, fd, iovecs, offset);
+ },
+ .windows => {
+ const data_copy = try std.mem.dupe(loop.allocator, []const u8, data);
+ defer loop.allocator.free(data_copy);
+ return pwritevWindows(loop, fd, data, offset);
+ },
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// data must outlive the returned frame
+pub fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void {
+ if (data.len == 0) return;
+ if (data.len == 1) return pwriteWindows(loop, fd, data[0], offset);
+
+ // TODO do these in parallel
+ var off = offset;
+ for (data) |buf| {
+ try pwriteWindows(loop, fd, buf, off);
+ off += buf.len;
+ }
+}
+
+pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void {
+ var resume_node = Loop.ResumeNode.Basic{
+ .base = Loop.ResumeNode{
+ .id = Loop.ResumeNode.Id.Basic,
+ .handle = @frame(),
+ .overlapped = windows.OVERLAPPED{
+ .Internal = 0,
+ .InternalHigh = 0,
+ .Offset = @truncate(u32, offset),
+ .OffsetHigh = @truncate(u32, offset >> 32),
+ .hEvent = null,
+ },
+ },
+ };
+ // TODO only call create io completion port once per fd
+ _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined);
+ loop.beginOneEvent();
+ errdefer loop.finishOneEvent();
+
+ errdefer {
+ _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped);
+ }
+ suspend {
+ _ = windows.kernel32.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped);
+ }
+ var bytes_transferred: windows.DWORD = undefined;
+ if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
+ switch (windows.kernel32.GetLastError()) {
+ windows.ERROR.IO_PENDING => unreachable,
+ windows.ERROR.INVALID_USER_BUFFER => return error.SystemResources,
+ windows.ERROR.NOT_ENOUGH_MEMORY => return error.SystemResources,
+ windows.ERROR.OPERATION_ABORTED => return error.OperationAborted,
+ windows.ERROR.NOT_ENOUGH_QUOTA => return error.SystemResources,
+ windows.ERROR.BROKEN_PIPE => return error.BrokenPipe,
+ else => |err| return windows.unexpectedError(err),
+ }
+ }
+}
+
+/// iovecs must live until pwritev frame completes.
+pub fn pwritevPosix(
+ loop: *Loop,
+ fd: fd_t,
+ iovecs: []const os.iovec_const,
+ offset: usize,
+) os.WriteError!void {
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .PWriteV = Request.Msg.PWriteV{
+ .fd = fd,
+ .iov = iovecs,
+ .offset = offset,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ errdefer loop.posixFsCancel(&req_node);
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.PWriteV.result;
+}
+
+/// iovecs must live until pwritev frame completes.
+pub fn writevPosix(
+ loop: *Loop,
+ fd: fd_t,
+ iovecs: []const os.iovec_const,
+) os.WriteError!void {
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .WriteV = Request.Msg.WriteV{
+ .fd = fd,
+ .iov = iovecs,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.WriteV.result;
+}
+
+pub const PReadVError = error{OutOfMemory} || File.ReadError;
+
+/// data - just the inner references - must live until preadv frame completes.
+pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize {
+ assert(data.len != 0);
+ switch (builtin.os) {
+ .macosx,
+ .linux,
+ .freebsd,
+ .netbsd,
+ => {
+ const iovecs = try loop.allocator.alloc(os.iovec, data.len);
+ defer loop.allocator.free(iovecs);
+
+ for (data) |buf, i| {
+ iovecs[i] = os.iovec{
+ .iov_base = buf.ptr,
+ .iov_len = buf.len,
+ };
+ }
+
+ return preadvPosix(loop, fd, iovecs, offset);
+ },
+ .windows => {
+ const data_copy = try std.mem.dupe(loop.allocator, []u8, data);
+ defer loop.allocator.free(data_copy);
+ return preadvWindows(loop, fd, data_copy, offset);
+ },
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// data must outlive the returned frame
+pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize {
+ assert(data.len != 0);
+ if (data.len == 1) return preadWindows(loop, fd, data[0], offset);
+
+ // TODO do these in parallel?
+ var off: usize = 0;
+ var iov_i: usize = 0;
+ var inner_off: usize = 0;
+ while (true) {
+ const v = data[iov_i];
+ const amt_read = try preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off);
+ off += amt_read;
+ inner_off += amt_read;
+ if (inner_off == v.len) {
+ iov_i += 1;
+ inner_off = 0;
+ if (iov_i == data.len) {
+ return off;
+ }
+ }
+ if (amt_read == 0) return off; // EOF
+ }
+}
+
+pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize {
+ var resume_node = Loop.ResumeNode.Basic{
+ .base = Loop.ResumeNode{
+ .id = Loop.ResumeNode.Id.Basic,
+ .handle = @frame(),
+ .overlapped = windows.OVERLAPPED{
+ .Internal = 0,
+ .InternalHigh = 0,
+ .Offset = @truncate(u32, offset),
+ .OffsetHigh = @truncate(u32, offset >> 32),
+ .hEvent = null,
+ },
+ },
+ };
+ // TODO only call create io completion port once per fd
+ _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined) catch undefined;
+ loop.beginOneEvent();
+ errdefer loop.finishOneEvent();
+
+ errdefer {
+ _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped);
+ }
+ suspend {
+ _ = windows.kernel32.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped);
+ }
+ var bytes_transferred: windows.DWORD = undefined;
+ if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
+ switch (windows.kernel32.GetLastError()) {
+ windows.ERROR.IO_PENDING => unreachable,
+ windows.ERROR.OPERATION_ABORTED => return error.OperationAborted,
+ windows.ERROR.BROKEN_PIPE => return error.BrokenPipe,
+ windows.ERROR.HANDLE_EOF => return usize(bytes_transferred),
+ else => |err| return windows.unexpectedError(err),
+ }
+ }
+ return usize(bytes_transferred);
+}
+
+/// iovecs must live until preadv frame completes
+pub fn preadvPosix(
+ loop: *Loop,
+ fd: fd_t,
+ iovecs: []const os.iovec,
+ offset: usize,
+) os.ReadError!usize {
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .PReadV = Request.Msg.PReadV{
+ .fd = fd,
+ .iov = iovecs,
+ .offset = offset,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ errdefer loop.posixFsCancel(&req_node);
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.PReadV.result;
+}
+
+pub fn openPosix(
+ loop: *Loop,
+ path: []const u8,
+ flags: u32,
+ mode: File.Mode,
+) File.OpenError!fd_t {
+ const path_c = try std.os.toPosixPath(path);
+
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .Open = Request.Msg.Open{
+ .path = path_c[0..path.len],
+ .flags = flags,
+ .mode = mode,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ errdefer loop.posixFsCancel(&req_node);
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.Open.result;
+}
+
+pub fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t {
+ switch (builtin.os) {
+ .macosx, .linux, .freebsd, .netbsd => {
+ const flags = os.O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC;
+ return openPosix(loop, path, flags, File.default_mode);
+ },
+
+ .windows => return windows.CreateFile(
+ path,
+ windows.GENERIC_READ,
+ windows.FILE_SHARE_READ,
+ null,
+ windows.OPEN_EXISTING,
+ windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ ),
+
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// Creates if does not exist. Truncates the file if it exists.
+/// Uses the default mode.
+pub fn openWrite(loop: *Loop, path: []const u8) File.OpenError!fd_t {
+ return openWriteMode(loop, path, File.default_mode);
+}
+
+/// Creates if does not exist. Truncates the file if it exists.
+pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenError!fd_t {
+ switch (builtin.os) {
+ .macosx,
+ .linux,
+ .freebsd,
+ .netbsd,
+ => {
+ const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC;
+ return openPosix(loop, path, flags, File.default_mode);
+ },
+ .windows => return windows.CreateFile(
+ path,
+ windows.GENERIC_WRITE,
+ windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE,
+ null,
+ windows.CREATE_ALWAYS,
+ windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ ),
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// Creates if does not exist. Does not truncate.
+pub fn openReadWrite(
+ loop: *Loop,
+ path: []const u8,
+ mode: File.Mode,
+) File.OpenError!fd_t {
+ switch (builtin.os) {
+ .macosx, .linux, .freebsd, .netbsd => {
+ const flags = os.O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC;
+ return openPosix(loop, path, flags, mode);
+ },
+
+ .windows => return windows.CreateFile(
+ path,
+ windows.GENERIC_WRITE | windows.GENERIC_READ,
+ windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE,
+ null,
+ windows.OPEN_ALWAYS,
+ windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ ),
+
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// This abstraction helps to close file handles in defer expressions
+/// without the possibility of failure and without the use of suspend points.
+/// Start a `CloseOperation` before opening a file, so that you can defer
+/// `CloseOperation.finish`.
+/// If you call `setHandle` then finishing will close the fd; otherwise finishing
+/// will deallocate the `CloseOperation`.
+pub const CloseOperation = struct {
+ loop: *Loop,
+ os_data: OsData,
+
+ const OsData = switch (builtin.os) {
+ .linux, .macosx, .freebsd, .netbsd => OsDataPosix,
+
+ .windows => struct {
+ handle: ?fd_t,
+ },
+
+ else => @compileError("Unsupported OS"),
+ };
+
+ const OsDataPosix = struct {
+ have_fd: bool,
+ close_req_node: RequestNode,
+ };
+
+ pub fn start(loop: *Loop) (error{OutOfMemory}!*CloseOperation) {
+ const self = try loop.allocator.create(CloseOperation);
+ self.* = CloseOperation{
+ .loop = loop,
+ .os_data = switch (builtin.os) {
+ .linux, .macosx, .freebsd, .netbsd => initOsDataPosix(self),
+ .windows => OsData{ .handle = null },
+ else => @compileError("Unsupported OS"),
+ },
+ };
+ return self;
+ }
+
+ fn initOsDataPosix(self: *CloseOperation) OsData {
+ return OsData{
+ .have_fd = false,
+ .close_req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .Close = Request.Msg.Close{ .fd = undefined },
+ },
+ .finish = Request.Finish{ .DeallocCloseOperation = self },
+ },
+ },
+ };
+ }
+
+ /// Defer this after creating.
+ pub fn finish(self: *CloseOperation) void {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => {
+ if (self.os_data.have_fd) {
+ self.loop.posixFsRequest(&self.os_data.close_req_node);
+ } else {
+ self.loop.allocator.destroy(self);
+ }
+ },
+ .windows => {
+ if (self.os_data.handle) |handle| {
+ os.close(handle);
+ }
+ self.loop.allocator.destroy(self);
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ pub fn setHandle(self: *CloseOperation, handle: fd_t) void {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => {
+ self.os_data.close_req_node.data.msg.Close.fd = handle;
+ self.os_data.have_fd = true;
+ },
+ .windows => {
+ self.os_data.handle = handle;
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ /// Undo a `setHandle`.
+ pub fn clearHandle(self: *CloseOperation) void {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => {
+ self.os_data.have_fd = false;
+ },
+ .windows => {
+ self.os_data.handle = null;
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ pub fn getHandle(self: *CloseOperation) fd_t {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => {
+ assert(self.os_data.have_fd);
+ return self.os_data.close_req_node.data.msg.Close.fd;
+ },
+ .windows => {
+ return self.os_data.handle.?;
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+};
+
+/// contents must remain alive until writeFile completes.
+/// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate
+pub fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void {
+ return writeFileMode(loop, path, contents, File.default_mode);
+}
+
+/// contents must remain alive until writeFile completes.
+pub fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => return writeFileModeThread(loop, path, contents, mode),
+ .windows => return writeFileWindows(loop, path, contents),
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void {
+ const handle = try windows.CreateFile(
+ path,
+ windows.GENERIC_WRITE,
+ windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE,
+ null,
+ windows.CREATE_ALWAYS,
+ windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ );
+ defer os.close(handle);
+
+ try pwriteWindows(loop, handle, contents, 0);
+}
+
+fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
+ const path_with_null = try std.cstr.addNullByte(loop.allocator, path);
+ defer loop.allocator.free(path_with_null);
+
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .WriteFile = Request.Msg.WriteFile{
+ .path = path_with_null[0..path.len],
+ .contents = contents,
+ .mode = mode,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ errdefer loop.posixFsCancel(&req_node);
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.WriteFile.result;
+}
+
+/// The frame resumes when the last data has been confirmed written, but before the file handle
+/// is closed.
+/// Caller owns returned memory.
+pub fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 {
+ var close_op = try CloseOperation.start(loop);
+ defer close_op.finish();
+
+ const fd = try openRead(loop, file_path);
+ close_op.setHandle(fd);
+
+ var list = std.ArrayList(u8).init(loop.allocator);
+ defer list.deinit();
+
+ while (true) {
+ try list.ensureCapacity(list.len + mem.page_size);
+ const buf = list.items[list.len..];
+ const buf_array = [_][]u8{buf};
+ const amt = try preadv(loop, fd, buf_array, list.len);
+ list.len += amt;
+ if (list.len > max_size) {
+ return error.FileTooBig;
+ }
+ if (amt < buf.len) {
+ return list.toOwnedSlice();
+ }
+ }
+}
+
+pub const WatchEventId = enum {
+ CloseWrite,
+ Delete,
+};
+
+fn eqlString(a: []const u16, b: []const u16) bool {
+ if (a.len != b.len) return false;
+ if (a.ptr == b.ptr) return true;
+ return mem.compare(u16, a, b) == .Equal;
+}
+
+fn hashString(s: []const u16) u32 {
+ return @truncate(u32, std.hash.Wyhash.hash(0, @sliceToBytes(s)));
+}
+
+//pub const WatchEventError = error{
+// UserResourceLimitReached,
+// SystemResources,
+// AccessDenied,
+// Unexpected, // TODO remove this possibility
+//};
+//
+//pub fn Watch(comptime V: type) type {
+// return struct {
+// channel: *event.Channel(Event.Error!Event),
+// os_data: OsData,
+//
+// const OsData = switch (builtin.os) {
+// .macosx, .freebsd, .netbsd => struct {
+// file_table: FileTable,
+// table_lock: event.Lock,
+//
+// const FileTable = std.StringHashmap(*Put);
+// const Put = struct {
+// putter: anyframe,
+// value_ptr: *V,
+// };
+// },
+//
+// .linux => LinuxOsData,
+// .windows => WindowsOsData,
+//
+// else => @compileError("Unsupported OS"),
+// };
+//
+// const WindowsOsData = struct {
+// table_lock: event.Lock,
+// dir_table: DirTable,
+// all_putters: std.atomic.Queue(anyframe),
+// ref_count: std.atomic.Int(usize),
+//
+// const DirTable = std.StringHashMap(*Dir);
+// const FileTable = std.HashMap([]const u16, V, hashString, eqlString);
+//
+// const Dir = struct {
+// putter: anyframe,
+// file_table: FileTable,
+// table_lock: event.Lock,
+// };
+// };
+//
+// const LinuxOsData = struct {
+// putter: anyframe,
+// inotify_fd: i32,
+// wd_table: WdTable,
+// table_lock: event.Lock,
+//
+// const WdTable = std.AutoHashMap(i32, Dir);
+// const FileTable = std.StringHashMap(V);
+//
+// const Dir = struct {
+// dirname: []const u8,
+// file_table: FileTable,
+// };
+// };
+//
+// const FileToHandle = std.StringHashMap(anyframe);
+//
+// const Self = @This();
+//
+// pub const Event = struct {
+// id: Id,
+// data: V,
+//
+// pub const Id = WatchEventId;
+// pub const Error = WatchEventError;
+// };
+//
+// pub fn create(loop: *Loop, event_buf_count: usize) !*Self {
+// const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count);
+// errdefer channel.destroy();
+//
+// switch (builtin.os) {
+// .linux => {
+// const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC);
+// errdefer os.close(inotify_fd);
+//
+// var result: *Self = undefined;
+// _ = try async<loop.allocator> linuxEventPutter(inotify_fd, channel, &result);
+// return result;
+// },
+//
+// .windows => {
+// const self = try loop.allocator.create(Self);
+// errdefer loop.allocator.destroy(self);
+// self.* = Self{
+// .channel = channel,
+// .os_data = OsData{
+// .table_lock = event.Lock.init(loop),
+// .dir_table = OsData.DirTable.init(loop.allocator),
+// .ref_count = std.atomic.Int(usize).init(1),
+// .all_putters = std.atomic.Queue(anyframe).init(),
+// },
+// };
+// return self;
+// },
+//
+// .macosx, .freebsd, .netbsd => {
+// const self = try loop.allocator.create(Self);
+// errdefer loop.allocator.destroy(self);
+//
+// self.* = Self{
+// .channel = channel,
+// .os_data = OsData{
+// .table_lock = event.Lock.init(loop),
+// .file_table = OsData.FileTable.init(loop.allocator),
+// },
+// };
+// return self;
+// },
+// else => @compileError("Unsupported OS"),
+// }
+// }
+//
+// /// All addFile calls and removeFile calls must have completed.
+// pub fn destroy(self: *Self) void {
+// switch (builtin.os) {
+// .macosx, .freebsd, .netbsd => {
+// // TODO we need to cancel the frames before destroying the lock
+// self.os_data.table_lock.deinit();
+// var it = self.os_data.file_table.iterator();
+// while (it.next()) |entry| {
+// cancel entry.value.putter;
+// self.channel.loop.allocator.free(entry.key);
+// }
+// self.channel.destroy();
+// },
+// .linux => cancel self.os_data.putter,
+// .windows => {
+// while (self.os_data.all_putters.get()) |putter_node| {
+// cancel putter_node.data;
+// }
+// self.deref();
+// },
+// else => @compileError("Unsupported OS"),
+// }
+// }
+//
+// fn ref(self: *Self) void {
+// _ = self.os_data.ref_count.incr();
+// }
+//
+// fn deref(self: *Self) void {
+// if (self.os_data.ref_count.decr() == 1) {
+// const allocator = self.channel.loop.allocator;
+// self.os_data.table_lock.deinit();
+// var it = self.os_data.dir_table.iterator();
+// while (it.next()) |entry| {
+// allocator.free(entry.key);
+// allocator.destroy(entry.value);
+// }
+// self.os_data.dir_table.deinit();
+// self.channel.destroy();
+// allocator.destroy(self);
+// }
+// }
+//
+// pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V {
+// switch (builtin.os) {
+// .macosx, .freebsd, .netbsd => return await (async addFileKEvent(self, file_path, value) catch unreachable),
+// .linux => return await (async addFileLinux(self, file_path, value) catch unreachable),
+// .windows => return await (async addFileWindows(self, file_path, value) catch unreachable),
+// else => @compileError("Unsupported OS"),
+// }
+// }
+//
+// async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V {
+// const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path});
+// var resolved_path_consumed = false;
+// defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path);
+//
+// var close_op = try CloseOperation.start(self.channel.loop);
+// var close_op_consumed = false;
+// defer if (!close_op_consumed) close_op.finish();
+//
+// const flags = if (os.darwin.is_the_target) os.O_SYMLINK | os.O_EVTONLY else 0;
+// const mode = 0;
+// const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable);
+// close_op.setHandle(fd);
+//
+// var put_data: *OsData.Put = undefined;
+// const putter = try async self.kqPutEvents(close_op, value, &put_data);
+// close_op_consumed = true;
+// errdefer cancel putter;
+//
+// const result = blk: {
+// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// const gop = try self.os_data.file_table.getOrPut(resolved_path);
+// if (gop.found_existing) {
+// const prev_value = gop.kv.value.value_ptr.*;
+// cancel gop.kv.value.putter;
+// gop.kv.value = put_data;
+// break :blk prev_value;
+// } else {
+// resolved_path_consumed = true;
+// gop.kv.value = put_data;
+// break :blk null;
+// }
+// };
+//
+// return result;
+// }
+//
+// async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void {
+// var value_copy = value;
+// var put = OsData.Put{
+// .putter = @frame(),
+// .value_ptr = &value_copy,
+// };
+// out_put.* = &put;
+// self.channel.loop.beginOneEvent();
+//
+// defer {
+// close_op.finish();
+// self.channel.loop.finishOneEvent();
+// }
+//
+// while (true) {
+// if (await (async self.channel.loop.bsdWaitKev(
+// @intCast(usize, close_op.getHandle()),
+// os.EVFILT_VNODE,
+// os.NOTE_WRITE | os.NOTE_DELETE,
+// ) catch unreachable)) |kev| {
+// // TODO handle EV_ERROR
+// if (kev.fflags & os.NOTE_DELETE != 0) {
+// await (async self.channel.put(Self.Event{
+// .id = Event.Id.Delete,
+// .data = value_copy,
+// }) catch unreachable);
+// } else if (kev.fflags & os.NOTE_WRITE != 0) {
+// await (async self.channel.put(Self.Event{
+// .id = Event.Id.CloseWrite,
+// .data = value_copy,
+// }) catch unreachable);
+// }
+// } else |err| switch (err) {
+// error.EventNotFound => unreachable,
+// error.ProcessNotFound => unreachable,
+// error.Overflow => unreachable,
+// error.AccessDenied, error.SystemResources => |casted_err| {
+// await (async self.channel.put(casted_err) catch unreachable);
+// },
+// }
+// }
+// }
+//
+// async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
+// const value_copy = value;
+//
+// const dirname = std.fs.path.dirname(file_path) orelse ".";
+// const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname);
+// var dirname_with_null_consumed = false;
+// defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null);
+//
+// const basename = std.fs.path.basename(file_path);
+// const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename);
+// var basename_with_null_consumed = false;
+// defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null);
+//
+// const wd = try os.inotify_add_watchC(
+// self.os_data.inotify_fd,
+// dirname_with_null.ptr,
+// os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK,
+// );
+// // wd is either a newly created watch or an existing one.
+//
+// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// const gop = try self.os_data.wd_table.getOrPut(wd);
+// if (!gop.found_existing) {
+// gop.kv.value = OsData.Dir{
+// .dirname = dirname_with_null,
+// .file_table = OsData.FileTable.init(self.channel.loop.allocator),
+// };
+// dirname_with_null_consumed = true;
+// }
+// const dir = &gop.kv.value;
+//
+// const file_table_gop = try dir.file_table.getOrPut(basename_with_null);
+// if (file_table_gop.found_existing) {
+// const prev_value = file_table_gop.kv.value;
+// file_table_gop.kv.value = value_copy;
+// return prev_value;
+// } else {
+// file_table_gop.kv.value = value_copy;
+// basename_with_null_consumed = true;
+// return null;
+// }
+// }
+//
+// async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V {
+// const value_copy = value;
+// // TODO we might need to convert dirname and basename to canonical file paths ("short"?)
+//
+// const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse ".");
+// var dirname_consumed = false;
+// defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname);
+//
+// const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname);
+// defer self.channel.loop.allocator.free(dirname_utf16le);
+//
+// // TODO https://github.com/ziglang/zig/issues/265
+// const basename = std.fs.path.basename(file_path);
+// const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename);
+// var basename_utf16le_null_consumed = false;
+// defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null);
+// const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1];
+//
+// const dir_handle = try windows.CreateFileW(
+// dirname_utf16le.ptr,
+// windows.FILE_LIST_DIRECTORY,
+// windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE,
+// null,
+// windows.OPEN_EXISTING,
+// windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED,
+// null,
+// );
+// var dir_handle_consumed = false;
+// defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle);
+//
+// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// const gop = try self.os_data.dir_table.getOrPut(dirname);
+// if (gop.found_existing) {
+// const dir = gop.kv.value;
+// const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable);
+// defer held_dir_lock.release();
+//
+// const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null);
+// if (file_gop.found_existing) {
+// const prev_value = file_gop.kv.value;
+// file_gop.kv.value = value_copy;
+// return prev_value;
+// } else {
+// file_gop.kv.value = value_copy;
+// basename_utf16le_null_consumed = true;
+// return null;
+// }
+// } else {
+// errdefer _ = self.os_data.dir_table.remove(dirname);
+// const dir = try self.channel.loop.allocator.create(OsData.Dir);
+// errdefer self.channel.loop.allocator.destroy(dir);
+//
+// dir.* = OsData.Dir{
+// .file_table = OsData.FileTable.init(self.channel.loop.allocator),
+// .table_lock = event.Lock.init(self.channel.loop),
+// .putter = undefined,
+// };
+// gop.kv.value = dir;
+// assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null);
+// basename_utf16le_null_consumed = true;
+//
+// dir.putter = try async self.windowsDirReader(dir_handle, dir);
+// dir_handle_consumed = true;
+//
+// dirname_consumed = true;
+//
+// return null;
+// }
+// }
+//
+// async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void {
+// self.ref();
+// defer self.deref();
+//
+// defer os.close(dir_handle);
+//
+// var putter_node = std.atomic.Queue(anyframe).Node{
+// .data = @frame(),
+// .prev = null,
+// .next = null,
+// };
+// self.os_data.all_putters.put(&putter_node);
+// defer _ = self.os_data.all_putters.remove(&putter_node);
+//
+// var resume_node = Loop.ResumeNode.Basic{
+// .base = Loop.ResumeNode{
+// .id = Loop.ResumeNode.Id.Basic,
+// .handle = @frame(),
+// .overlapped = windows.OVERLAPPED{
+// .Internal = 0,
+// .InternalHigh = 0,
+// .Offset = 0,
+// .OffsetHigh = 0,
+// .hEvent = null,
+// },
+// },
+// };
+// var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined;
+//
+// // TODO handle this error not in the channel but in the setup
+// _ = windows.CreateIoCompletionPort(
+// dir_handle,
+// self.channel.loop.os_data.io_port,
+// undefined,
+// undefined,
+// ) catch |err| {
+// await (async self.channel.put(err) catch unreachable);
+// return;
+// };
+//
+// while (true) {
+// {
+// // TODO only 1 beginOneEvent for the whole function
+// self.channel.loop.beginOneEvent();
+// errdefer self.channel.loop.finishOneEvent();
+// errdefer {
+// _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped);
+// }
+// suspend {
+// _ = windows.kernel32.ReadDirectoryChangesW(
+// dir_handle,
+// &event_buf,
+// @intCast(windows.DWORD, event_buf.len),
+// windows.FALSE, // watch subtree
+// windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME |
+// windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE |
+// windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
+// windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
+// null, // number of bytes transferred (unused for async)
+// &resume_node.base.overlapped,
+// null, // completion routine - unused because we use IOCP
+// );
+// }
+// }
+// var bytes_transferred: windows.DWORD = undefined;
+// if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
+// const err = switch (windows.kernel32.GetLastError()) {
+// else => |err| windows.unexpectedError(err),
+// };
+// await (async self.channel.put(err) catch unreachable);
+// } else {
+// // can't use @bytesToSlice because of the special variable length name field
+// var ptr = event_buf[0..].ptr;
+// const end_ptr = ptr + bytes_transferred;
+// var ev: *windows.FILE_NOTIFY_INFORMATION = undefined;
+// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) {
+// ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr);
+// const emit = switch (ev.Action) {
+// windows.FILE_ACTION_REMOVED => WatchEventId.Delete,
+// windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite,
+// else => null,
+// };
+// if (emit) |id| {
+// const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2];
+// const user_value = blk: {
+// const held = await (async dir.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// if (dir.file_table.get(basename_utf16le)) |entry| {
+// break :blk entry.value;
+// } else {
+// break :blk null;
+// }
+// };
+// if (user_value) |v| {
+// await (async self.channel.put(Event{
+// .id = id,
+// .data = v,
+// }) catch unreachable);
+// }
+// }
+// if (ev.NextEntryOffset == 0) break;
+// }
+// }
+// }
+// }
+//
+// pub async fn removeFile(self: *Self, file_path: []const u8) ?V {
+// @panic("TODO");
+// }
+//
+// async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void {
+// const loop = channel.loop;
+//
+// var watch = Self{
+// .channel = channel,
+// .os_data = OsData{
+// .putter = @frame(),
+// .inotify_fd = inotify_fd,
+// .wd_table = OsData.WdTable.init(loop.allocator),
+// .table_lock = event.Lock.init(loop),
+// },
+// };
+// out_watch.* = &watch;
+//
+// loop.beginOneEvent();
+//
+// defer {
+// watch.os_data.table_lock.deinit();
+// var wd_it = watch.os_data.wd_table.iterator();
+// while (wd_it.next()) |wd_entry| {
+// var file_it = wd_entry.value.file_table.iterator();
+// while (file_it.next()) |file_entry| {
+// loop.allocator.free(file_entry.key);
+// }
+// loop.allocator.free(wd_entry.value.dirname);
+// }
+// loop.finishOneEvent();
+// os.close(inotify_fd);
+// channel.destroy();
+// }
+//
+// var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
+//
+// while (true) {
+// const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len);
+// const errno = os.linux.getErrno(rc);
+// switch (errno) {
+// 0 => {
+// // can't use @bytesToSlice because of the special variable length name field
+// var ptr = event_buf[0..].ptr;
+// const end_ptr = ptr + event_buf.len;
+// var ev: *os.linux.inotify_event = undefined;
+// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) {
+// ev = @ptrCast(*os.linux.inotify_event, ptr);
+// if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
+// const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
+// const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1];
+// const user_value = blk: {
+// const held = await (async watch.os_data.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// const dir = &watch.os_data.wd_table.get(ev.wd).?.value;
+// if (dir.file_table.get(basename_with_null)) |entry| {
+// break :blk entry.value;
+// } else {
+// break :blk null;
+// }
+// };
+// if (user_value) |v| {
+// await (async channel.put(Event{
+// .id = WatchEventId.CloseWrite,
+// .data = v,
+// }) catch unreachable);
+// }
+// }
+// }
+// },
+// os.linux.EINTR => continue,
+// os.linux.EINVAL => unreachable,
+// os.linux.EFAULT => unreachable,
+// os.linux.EAGAIN => {
+// (await (async loop.linuxWaitFd(
+// inotify_fd,
+// os.linux.EPOLLET | os.linux.EPOLLIN,
+// ) catch unreachable)) catch |err| {
+// const transformed_err = switch (err) {
+// error.FileDescriptorAlreadyPresentInSet => unreachable,
+// error.OperationCausesCircularLoop => unreachable,
+// error.FileDescriptorNotRegistered => unreachable,
+// error.FileDescriptorIncompatibleWithEpoll => unreachable,
+// error.Unexpected => unreachable,
+// else => |e| e,
+// };
+// await (async channel.put(transformed_err) catch unreachable);
+// };
+// },
+// else => unreachable,
+// }
+// }
+// }
+// };
+//}
+
+const test_tmp_dir = "std_event_fs_test";
+
+// TODO this test is disabled until the async function rewrite is finished.
+//test "write a file, watch it, write it again" {
+// return error.SkipZigTest;
+// const allocator = std.heap.direct_allocator;
+//
+// // TODO move this into event loop too
+// try os.makePath(allocator, test_tmp_dir);
+// defer os.deleteTree(allocator, test_tmp_dir) catch {};
+//
+// var loop: Loop = undefined;
+// try loop.initMultiThreaded(allocator);
+// defer loop.deinit();
+//
+// var result: anyerror!void = error.ResultNeverWritten;
+// const handle = try async<allocator> testFsWatchCantFail(&loop, &result);
+// defer cancel handle;
+//
+// loop.run();
+// return result;
+//}
+
+fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void {
+ result.* = testFsWatch(loop);
+}
+
+fn testFsWatch(loop: *Loop) !void {
+ const file_path = try std.fs.path.join(loop.allocator, [][]const u8{ test_tmp_dir, "file.txt" });
+ defer loop.allocator.free(file_path);
+
+ const contents =
+ \\line 1
+ \\line 2
+ ;
+ const line2_offset = 7;
+
+ // first just write then read the file
+ try writeFile(loop, file_path, contents);
+
+ const read_contents = try readFile(loop, file_path, 1024 * 1024);
+ testing.expectEqualSlices(u8, contents, read_contents);
+
+ // now watch the file
+ var watch = try Watch(void).create(loop, 0);
+ defer watch.destroy();
+
+ testing.expect((try watch.addFile(file_path, {})) == null);
+
+ const ev = async watch.channel.get();
+ var ev_consumed = false;
+ defer if (!ev_consumed) await ev;
+
+ // overwrite line 2
+ const fd = try await openReadWrite(loop, file_path, File.default_mode);
+ {
+ defer os.close(fd);
+
+ try pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset);
+ }
+
+ ev_consumed = true;
+ switch ((try await ev).id) {
+ WatchEventId.CloseWrite => {},
+ WatchEventId.Delete => @panic("wrong event"),
+ }
+ const contents_updated = try readFile(loop, file_path, 1024 * 1024);
+ testing.expectEqualSlices(u8,
+ \\line 1
+ \\lorem ipsum
+ , contents_updated);
+
+ // TODO test deleting the file and then re-adding it. we should get events for both
+}
+
+pub const OutStream = struct {
+ fd: fd_t,
+ stream: Stream,
+ loop: *Loop,
+ offset: usize,
+
+ pub const Error = File.WriteError;
+ pub const Stream = event.io.OutStream(Error);
+
+ pub fn init(loop: *Loop, fd: fd_t, offset: usize) OutStream {
+ return OutStream{
+ .fd = fd,
+ .loop = loop,
+ .offset = offset,
+ .stream = Stream{ .writeFn = writeFn },
+ };
+ }
+
+ fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void {
+ const self = @fieldParentPtr(OutStream, "stream", out_stream);
+ const offset = self.offset;
+ self.offset += bytes.len;
+ return pwritev(self.loop, self.fd, [][]const u8{bytes}, offset);
+ }
+};
+
+pub const InStream = struct {
+ fd: fd_t,
+ stream: Stream,
+ loop: *Loop,
+ offset: usize,
+
+ pub const Error = PReadVError; // TODO make this not have OutOfMemory
+ pub const Stream = event.io.InStream(Error);
+
+ pub fn init(loop: *Loop, fd: fd_t, offset: usize) InStream {
+ return InStream{
+ .fd = fd,
+ .loop = loop,
+ .offset = offset,
+ .stream = Stream{ .readFn = readFn },
+ };
+ }
+
+ fn readFn(in_stream: *Stream, bytes: []u8) Error!usize {
+ const self = @fieldParentPtr(InStream, "stream", in_stream);
+ const amt = try preadv(self.loop, self.fd, [][]u8{bytes}, self.offset);
+ self.offset += amt;
+ return amt;
+ }
+};