diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2020-02-06 17:56:40 -0500 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2020-02-06 18:05:50 -0500 |
| commit | 0b5bcd2f56a84e66d5c700744ec1838381893667 (patch) | |
| tree | 5b640a57055e50636fe7a9f782915b2126395ef3 /lib/std/event/loop.zig | |
| parent | 704cd977bdcdfa8cff4e70aaad93857d9b622fc7 (diff) | |
| download | zig-0b5bcd2f56a84e66d5c700744ec1838381893667.tar.gz zig-0b5bcd2f56a84e66d5c700744ec1838381893667.zip | |
more std lib async I/O integration
* `zig test` gainst `--test-evented-io` parameter and gains the ability
to seamlessly run async tests.
* `std.ChildProcess` opens its child process pipe with O_NONBLOCK when
using evented I/O
* `std.io.getStdErr()` gives a File that is blocking even in evented
I/O mode.
* Delete `std.event.fs`. The functionality is now merged into `std.fs`
and async file system access (using a dedicated thread) is
automatically handled.
* `std.fs.File` can be configured to specify whether its handle is
expected to block, and whether that is OK to block even when in
async I/O mode. This makes async I/O work correctly for e.g. the
file system as well as network.
* `std.fs.File` has some deprecated functions removed.
* Missing readv,writev,pread,pwrite,preadv,pwritev functions are added
to `std.os` and `std.fs.File`. They are all integrated with async
I/O.
* `std.fs.Watch` is still bit rotted and needs to be audited in light
of the new async/await syntax.
* `std.io.OutStream` integrates with async I/O
* linked list nodes in the std lib have default `null` values for
`prev` and `next`.
* Windows async I/O integration is enabled for reading/writing file
handles.
* Added `std.os.mode_t`. Integer sizes need to be audited.
* Fixed #4403 which was causing compiler to crash.
This is working towards:
./zig test ../test/stage1/behavior.zig --test-evented-io
Which does not successfully build yet. I'd like to enable behavioral
tests and std lib tests with --test-evented-io in the test matrix in the
future, to prevent regressions.
Diffstat (limited to 'lib/std/event/loop.zig')
| -rw-r--r-- | lib/std/event/loop.zig | 367 |
1 files changed, 317 insertions, 50 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index e80266c640..555dba3000 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -6,7 +6,6 @@ const testing = std.testing; const mem = std.mem; const AtomicRmwOp = builtin.AtomicRmwOp; const AtomicOrder = builtin.AtomicOrder; -const fs = std.event.fs; const os = std.os; const windows = os.windows; const maxInt = std.math.maxInt; @@ -174,21 +173,19 @@ pub const Loop = struct { fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { switch (builtin.os) { .linux => { - self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue = std.atomic.Queue(Request).init(); self.os_data.fs_queue_item = 0; // we need another thread for the file system because Linux does not have an async // file system I/O API. - self.os_data.fs_end_request = fs.RequestNode{ - .prev = undefined, - .next = undefined, - .data = fs.Request{ - .msg = fs.Request.Msg.End, - .finish = fs.Request.Finish.NoAction, + self.os_data.fs_end_request = Request.Node{ + .data = Request{ + .msg = .end, + .finish = .NoAction, }, }; errdefer { - while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd); } for (self.eventfd_resume_nodes) |*eventfd_node| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -207,10 +204,10 @@ pub const Loop = struct { } self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC); - errdefer os.close(self.os_data.epollfd); + errdefer noasync os.close(self.os_data.epollfd); self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK); - errdefer os.close(self.os_data.final_eventfd); + errdefer noasync os.close(self.os_data.final_eventfd); self.os_data.final_eventfd_event = os.epoll_event{ .events = os.EPOLLIN, @@ -237,7 +234,7 @@ pub const Loop = struct { var extra_thread_index: usize = 0; errdefer { // writing 8 bytes to an eventfd cannot fail - os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable; + noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); @@ -249,20 +246,20 @@ pub const Loop = struct { }, .macosx, .freebsd, .netbsd, .dragonfly => { self.os_data.kqfd = try os.kqueue(); - errdefer os.close(self.os_data.kqfd); + errdefer noasync os.close(self.os_data.kqfd); self.os_data.fs_kqfd = try os.kqueue(); - errdefer os.close(self.os_data.fs_kqfd); + errdefer noasync os.close(self.os_data.fs_kqfd); - self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue = std.atomic.Queue(Request).init(); // we need another thread for the file system because Darwin does not have an async // file system I/O API. - self.os_data.fs_end_request = fs.RequestNode{ + self.os_data.fs_end_request = Request.Node{ .prev = undefined, .next = undefined, - .data = fs.Request{ - .msg = fs.Request.Msg.End, - .finish = fs.Request.Finish.NoAction, + .data = Request{ + .msg = .end, + .finish = .NoAction, }, }; @@ -407,14 +404,14 @@ pub const Loop = struct { fn deinitOsData(self: *Loop) void { switch (builtin.os) { .linux => { - os.close(self.os_data.final_eventfd); - while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); - os.close(self.os_data.epollfd); + noasync os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd); + noasync os.close(self.os_data.epollfd); self.allocator.free(self.eventfd_resume_nodes); }, .macosx, .freebsd, .netbsd, .dragonfly => { - os.close(self.os_data.kqfd); - os.close(self.os_data.fs_kqfd); + noasync os.close(self.os_data.kqfd); + noasync os.close(self.os_data.fs_kqfd); }, .windows => { windows.CloseHandle(self.os_data.io_port); @@ -711,6 +708,190 @@ pub const Loop = struct { } } + /// Performs an async `os.open` using a separate thread. + pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: usize) os.OpenError!os.fd_t { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .open = .{ + .path = file_path, + .flags = flags, + .mode = mode, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.open.result; + } + + /// Performs an async `os.opent` using a separate thread. + pub fn openatZ(self: *Loop, fd: os.fd_t, file_path: [*:0]const u8, flags: u32, mode: usize) os.OpenError!os.fd_t { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .openat = .{ + .fd = fd, + .path = file_path, + .flags = flags, + .mode = mode, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.openat.result; + } + + /// Performs an async `os.close` using a separate thread. + pub fn close(self: *Loop, fd: os.fd_t) void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ .close = .{ .fd = fd } }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + } + + /// Performs an async `os.read` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn read(self: *Loop, fd: os.fd_t, buf: []u8) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .read = .{ + .fd = fd, + .buf = buf, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.read.result; + } + + /// Performs an async `os.readv` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .readv = .{ + .fd = fd, + .iov = iov, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.readv.result; + } + + /// Performs an async `os.preadv` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64) os.ReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .preadv = .{ + .fd = fd, + .iov = iov, + .offset = offset, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.preadv.result; + } + + /// Performs an async `os.write` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8) os.WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .write = .{ + .fd = fd, + .bytes = bytes, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.write.result; + } + + /// Performs an async `os.writev` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const) WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .writev = .{ + .fd = fd, + .iov = iov, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.writev.result; + } + + /// Performs an async `os.pwritev` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64) WriteError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .pwritev = .{ + .fd = fd, + .iov = iov, + .offset = offset, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.pwritev.result; + } + fn workerRun(self: *Loop) void { while (true) { while (true) { @@ -804,7 +985,7 @@ pub const Loop = struct { } } - fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + fn posixFsRequest(self: *Loop, request_node: *Request.Node) void { self.beginOneEvent(); // finished in posixFsRun after processing the msg self.os_data.fs_queue.put(request_node); switch (builtin.os) { @@ -826,7 +1007,7 @@ pub const Loop = struct { } } - fn posixFsCancel(self: *Loop, request_node: *fs.RequestNode) void { + fn posixFsCancel(self: *Loop, request_node: *Request.Node) void { if (self.os_data.fs_queue.remove(request_node)) { self.finishOneEvent(); } @@ -841,37 +1022,32 @@ pub const Loop = struct { } while (self.os_data.fs_queue.get()) |node| { switch (node.data.msg) { - .End => return, - .WriteV => |*msg| { + .end => return, + .read => |*msg| { + msg.result = noasync os.read(msg.fd, msg.buf); + }, + .write => |*msg| { + msg.result = noasync os.write(msg.fd, msg.bytes); + }, + .writev => |*msg| { msg.result = noasync os.writev(msg.fd, msg.iov); }, - .PWriteV => |*msg| { + .pwritev => |*msg| { msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset); }, - .PReadV => |*msg| { + .preadv => |*msg| { msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset); }, - .Open => |*msg| { - msg.result = noasync os.openC(msg.path.ptr, msg.flags, msg.mode); + .open => |*msg| { + msg.result = noasync os.openC(msg.path, msg.flags, msg.mode); }, - .Close => |*msg| noasync os.close(msg.fd), - .WriteFile => |*msg| blk: { - const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0; - const flags = O_LARGEFILE | os.O_WRONLY | os.O_CREAT | - os.O_CLOEXEC | os.O_TRUNC; - const fd = noasync os.openC(msg.path.ptr, flags, msg.mode) catch |err| { - msg.result = err; - break :blk; - }; - defer noasync os.close(fd); - msg.result = noasync os.write(fd, msg.contents); + .openat => |*msg| { + msg.result = noasync os.openatC(msg.fd, msg.path, msg.flags, msg.mode); }, + .close => |*msg| noasync os.close(msg.fd), } switch (node.data.finish) { .TickNode => |*tick_node| self.onNextTick(tick_node), - .DeallocCloseOperation => |close_op| { - self.allocator.destroy(close_op); - }, .NoAction => {}, } self.finishOneEvent(); @@ -911,8 +1087,8 @@ pub const Loop = struct { fs_kevent_wait: os.Kevent, fs_thread: *Thread, fs_kqfd: i32, - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, + fs_queue: std.atomic.Queue(Request), + fs_end_request: Request.Node, }; const LinuxOsData = struct { @@ -921,8 +1097,99 @@ pub const Loop = struct { final_eventfd_event: os.linux.epoll_event, fs_thread: *Thread, fs_queue_item: i32, - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, + fs_queue: std.atomic.Queue(Request), + fs_end_request: Request.Node, + }; + + pub const Request = struct { + msg: Msg, + finish: Finish, + + pub const Node = std.atomic.Queue(Request).Node; + + pub const Finish = union(enum) { + TickNode: Loop.NextTickNode, + NoAction, + }; + + pub const Msg = union(enum) { + read: Read, + write: Write, + writev: WriteV, + pwritev: PWriteV, + preadv: PReadV, + open: Open, + openat: OpenAt, + close: Close, + + /// special - means the fs thread should exit + end, + + pub const Read = struct { + fd: os.fd_t, + buf: []u8, + result: Error!usize, + + pub const Error = os.ReadError; + }; + + pub const Write = struct { + fd: os.fd_t, + bytes: []const u8, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const WriteV = struct { + fd: os.fd_t, + iov: []const os.iovec_const, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const PWriteV = struct { + fd: os.fd_t, + iov: []const os.iovec_const, + offset: usize, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const PReadV = struct { + fd: os.fd_t, + iov: []const os.iovec, + offset: usize, + result: Error!usize, + + pub const Error = os.ReadError; + }; + + pub const Open = struct { + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + result: Error!os.fd_t, + + pub const Error = os.OpenError; + }; + + pub const OpenAt = struct { + fd: os.fd_t, + path: [*:0]const u8, + flags: u32, + mode: os.mode_t, + result: Error!os.fd_t, + + pub const Error = os.OpenError; + }; + + pub const Close = struct { + fd: os.fd_t, + }; + }; }; }; |
