aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event/loop.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2020-02-06 17:56:40 -0500
committerAndrew Kelley <andrew@ziglang.org>2020-02-06 18:05:50 -0500
commit0b5bcd2f56a84e66d5c700744ec1838381893667 (patch)
tree5b640a57055e50636fe7a9f782915b2126395ef3 /lib/std/event/loop.zig
parent704cd977bdcdfa8cff4e70aaad93857d9b622fc7 (diff)
downloadzig-0b5bcd2f56a84e66d5c700744ec1838381893667.tar.gz
zig-0b5bcd2f56a84e66d5c700744ec1838381893667.zip
more std lib async I/O integration
* `zig test` gainst `--test-evented-io` parameter and gains the ability to seamlessly run async tests. * `std.ChildProcess` opens its child process pipe with O_NONBLOCK when using evented I/O * `std.io.getStdErr()` gives a File that is blocking even in evented I/O mode. * Delete `std.event.fs`. The functionality is now merged into `std.fs` and async file system access (using a dedicated thread) is automatically handled. * `std.fs.File` can be configured to specify whether its handle is expected to block, and whether that is OK to block even when in async I/O mode. This makes async I/O work correctly for e.g. the file system as well as network. * `std.fs.File` has some deprecated functions removed. * Missing readv,writev,pread,pwrite,preadv,pwritev functions are added to `std.os` and `std.fs.File`. They are all integrated with async I/O. * `std.fs.Watch` is still bit rotted and needs to be audited in light of the new async/await syntax. * `std.io.OutStream` integrates with async I/O * linked list nodes in the std lib have default `null` values for `prev` and `next`. * Windows async I/O integration is enabled for reading/writing file handles. * Added `std.os.mode_t`. Integer sizes need to be audited. * Fixed #4403 which was causing compiler to crash. This is working towards: ./zig test ../test/stage1/behavior.zig --test-evented-io Which does not successfully build yet. I'd like to enable behavioral tests and std lib tests with --test-evented-io in the test matrix in the future, to prevent regressions.
Diffstat (limited to 'lib/std/event/loop.zig')
-rw-r--r--lib/std/event/loop.zig367
1 files changed, 317 insertions, 50 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
index e80266c640..555dba3000 100644
--- a/lib/std/event/loop.zig
+++ b/lib/std/event/loop.zig
@@ -6,7 +6,6 @@ const testing = std.testing;
const mem = std.mem;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
-const fs = std.event.fs;
const os = std.os;
const windows = os.windows;
const maxInt = std.math.maxInt;
@@ -174,21 +173,19 @@ pub const Loop = struct {
fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
switch (builtin.os) {
.linux => {
- self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
+ self.os_data.fs_queue = std.atomic.Queue(Request).init();
self.os_data.fs_queue_item = 0;
// we need another thread for the file system because Linux does not have an async
// file system I/O API.
- self.os_data.fs_end_request = fs.RequestNode{
- .prev = undefined,
- .next = undefined,
- .data = fs.Request{
- .msg = fs.Request.Msg.End,
- .finish = fs.Request.Finish.NoAction,
+ self.os_data.fs_end_request = Request.Node{
+ .data = Request{
+ .msg = .end,
+ .finish = .NoAction,
},
};
errdefer {
- while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
+ while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd);
}
for (self.eventfd_resume_nodes) |*eventfd_node| {
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
@@ -207,10 +204,10 @@ pub const Loop = struct {
}
self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC);
- errdefer os.close(self.os_data.epollfd);
+ errdefer noasync os.close(self.os_data.epollfd);
self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK);
- errdefer os.close(self.os_data.final_eventfd);
+ errdefer noasync os.close(self.os_data.final_eventfd);
self.os_data.final_eventfd_event = os.epoll_event{
.events = os.EPOLLIN,
@@ -237,7 +234,7 @@ pub const Loop = struct {
var extra_thread_index: usize = 0;
errdefer {
// writing 8 bytes to an eventfd cannot fail
- os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
+ noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
while (extra_thread_index != 0) {
extra_thread_index -= 1;
self.extra_threads[extra_thread_index].wait();
@@ -249,20 +246,20 @@ pub const Loop = struct {
},
.macosx, .freebsd, .netbsd, .dragonfly => {
self.os_data.kqfd = try os.kqueue();
- errdefer os.close(self.os_data.kqfd);
+ errdefer noasync os.close(self.os_data.kqfd);
self.os_data.fs_kqfd = try os.kqueue();
- errdefer os.close(self.os_data.fs_kqfd);
+ errdefer noasync os.close(self.os_data.fs_kqfd);
- self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
+ self.os_data.fs_queue = std.atomic.Queue(Request).init();
// we need another thread for the file system because Darwin does not have an async
// file system I/O API.
- self.os_data.fs_end_request = fs.RequestNode{
+ self.os_data.fs_end_request = Request.Node{
.prev = undefined,
.next = undefined,
- .data = fs.Request{
- .msg = fs.Request.Msg.End,
- .finish = fs.Request.Finish.NoAction,
+ .data = Request{
+ .msg = .end,
+ .finish = .NoAction,
},
};
@@ -407,14 +404,14 @@ pub const Loop = struct {
fn deinitOsData(self: *Loop) void {
switch (builtin.os) {
.linux => {
- os.close(self.os_data.final_eventfd);
- while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
- os.close(self.os_data.epollfd);
+ noasync os.close(self.os_data.final_eventfd);
+ while (self.available_eventfd_resume_nodes.pop()) |node| noasync os.close(node.data.eventfd);
+ noasync os.close(self.os_data.epollfd);
self.allocator.free(self.eventfd_resume_nodes);
},
.macosx, .freebsd, .netbsd, .dragonfly => {
- os.close(self.os_data.kqfd);
- os.close(self.os_data.fs_kqfd);
+ noasync os.close(self.os_data.kqfd);
+ noasync os.close(self.os_data.fs_kqfd);
},
.windows => {
windows.CloseHandle(self.os_data.io_port);
@@ -711,6 +708,190 @@ pub const Loop = struct {
}
}
+ /// Performs an async `os.open` using a separate thread.
+ pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: usize) os.OpenError!os.fd_t {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .open = .{
+ .path = file_path,
+ .flags = flags,
+ .mode = mode,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.open.result;
+ }
+
+ /// Performs an async `os.opent` using a separate thread.
+ pub fn openatZ(self: *Loop, fd: os.fd_t, file_path: [*:0]const u8, flags: u32, mode: usize) os.OpenError!os.fd_t {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .openat = .{
+ .fd = fd,
+ .path = file_path,
+ .flags = flags,
+ .mode = mode,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.openat.result;
+ }
+
+ /// Performs an async `os.close` using a separate thread.
+ pub fn close(self: *Loop, fd: os.fd_t) void {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{ .close = .{ .fd = fd } },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ }
+
+ /// Performs an async `os.read` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn read(self: *Loop, fd: os.fd_t, buf: []u8) os.ReadError!usize {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .read = .{
+ .fd = fd,
+ .buf = buf,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.read.result;
+ }
+
+ /// Performs an async `os.readv` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec) os.ReadError!usize {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .readv = .{
+ .fd = fd,
+ .iov = iov,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.readv.result;
+ }
+
+ /// Performs an async `os.preadv` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64) os.ReadError!usize {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .preadv = .{
+ .fd = fd,
+ .iov = iov,
+ .offset = offset,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.preadv.result;
+ }
+
+ /// Performs an async `os.write` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8) os.WriteError!void {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .write = .{
+ .fd = fd,
+ .bytes = bytes,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.write.result;
+ }
+
+ /// Performs an async `os.writev` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const) WriteError!void {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .writev = .{
+ .fd = fd,
+ .iov = iov,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.writev.result;
+ }
+
+ /// Performs an async `os.pwritev` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64) WriteError!void {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .pwritev = .{
+ .fd = fd,
+ .iov = iov,
+ .offset = offset,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.pwritev.result;
+ }
+
fn workerRun(self: *Loop) void {
while (true) {
while (true) {
@@ -804,7 +985,7 @@ pub const Loop = struct {
}
}
- fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void {
+ fn posixFsRequest(self: *Loop, request_node: *Request.Node) void {
self.beginOneEvent(); // finished in posixFsRun after processing the msg
self.os_data.fs_queue.put(request_node);
switch (builtin.os) {
@@ -826,7 +1007,7 @@ pub const Loop = struct {
}
}
- fn posixFsCancel(self: *Loop, request_node: *fs.RequestNode) void {
+ fn posixFsCancel(self: *Loop, request_node: *Request.Node) void {
if (self.os_data.fs_queue.remove(request_node)) {
self.finishOneEvent();
}
@@ -841,37 +1022,32 @@ pub const Loop = struct {
}
while (self.os_data.fs_queue.get()) |node| {
switch (node.data.msg) {
- .End => return,
- .WriteV => |*msg| {
+ .end => return,
+ .read => |*msg| {
+ msg.result = noasync os.read(msg.fd, msg.buf);
+ },
+ .write => |*msg| {
+ msg.result = noasync os.write(msg.fd, msg.bytes);
+ },
+ .writev => |*msg| {
msg.result = noasync os.writev(msg.fd, msg.iov);
},
- .PWriteV => |*msg| {
+ .pwritev => |*msg| {
msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset);
},
- .PReadV => |*msg| {
+ .preadv => |*msg| {
msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset);
},
- .Open => |*msg| {
- msg.result = noasync os.openC(msg.path.ptr, msg.flags, msg.mode);
+ .open => |*msg| {
+ msg.result = noasync os.openC(msg.path, msg.flags, msg.mode);
},
- .Close => |*msg| noasync os.close(msg.fd),
- .WriteFile => |*msg| blk: {
- const O_LARGEFILE = if (@hasDecl(os, "O_LARGEFILE")) os.O_LARGEFILE else 0;
- const flags = O_LARGEFILE | os.O_WRONLY | os.O_CREAT |
- os.O_CLOEXEC | os.O_TRUNC;
- const fd = noasync os.openC(msg.path.ptr, flags, msg.mode) catch |err| {
- msg.result = err;
- break :blk;
- };
- defer noasync os.close(fd);
- msg.result = noasync os.write(fd, msg.contents);
+ .openat => |*msg| {
+ msg.result = noasync os.openatC(msg.fd, msg.path, msg.flags, msg.mode);
},
+ .close => |*msg| noasync os.close(msg.fd),
}
switch (node.data.finish) {
.TickNode => |*tick_node| self.onNextTick(tick_node),
- .DeallocCloseOperation => |close_op| {
- self.allocator.destroy(close_op);
- },
.NoAction => {},
}
self.finishOneEvent();
@@ -911,8 +1087,8 @@ pub const Loop = struct {
fs_kevent_wait: os.Kevent,
fs_thread: *Thread,
fs_kqfd: i32,
- fs_queue: std.atomic.Queue(fs.Request),
- fs_end_request: fs.RequestNode,
+ fs_queue: std.atomic.Queue(Request),
+ fs_end_request: Request.Node,
};
const LinuxOsData = struct {
@@ -921,8 +1097,99 @@ pub const Loop = struct {
final_eventfd_event: os.linux.epoll_event,
fs_thread: *Thread,
fs_queue_item: i32,
- fs_queue: std.atomic.Queue(fs.Request),
- fs_end_request: fs.RequestNode,
+ fs_queue: std.atomic.Queue(Request),
+ fs_end_request: Request.Node,
+ };
+
+ pub const Request = struct {
+ msg: Msg,
+ finish: Finish,
+
+ pub const Node = std.atomic.Queue(Request).Node;
+
+ pub const Finish = union(enum) {
+ TickNode: Loop.NextTickNode,
+ NoAction,
+ };
+
+ pub const Msg = union(enum) {
+ read: Read,
+ write: Write,
+ writev: WriteV,
+ pwritev: PWriteV,
+ preadv: PReadV,
+ open: Open,
+ openat: OpenAt,
+ close: Close,
+
+ /// special - means the fs thread should exit
+ end,
+
+ pub const Read = struct {
+ fd: os.fd_t,
+ buf: []u8,
+ result: Error!usize,
+
+ pub const Error = os.ReadError;
+ };
+
+ pub const Write = struct {
+ fd: os.fd_t,
+ bytes: []const u8,
+ result: Error!void,
+
+ pub const Error = os.WriteError;
+ };
+
+ pub const WriteV = struct {
+ fd: os.fd_t,
+ iov: []const os.iovec_const,
+ result: Error!void,
+
+ pub const Error = os.WriteError;
+ };
+
+ pub const PWriteV = struct {
+ fd: os.fd_t,
+ iov: []const os.iovec_const,
+ offset: usize,
+ result: Error!void,
+
+ pub const Error = os.WriteError;
+ };
+
+ pub const PReadV = struct {
+ fd: os.fd_t,
+ iov: []const os.iovec,
+ offset: usize,
+ result: Error!usize,
+
+ pub const Error = os.ReadError;
+ };
+
+ pub const Open = struct {
+ path: [*:0]const u8,
+ flags: u32,
+ mode: os.mode_t,
+ result: Error!os.fd_t,
+
+ pub const Error = os.OpenError;
+ };
+
+ pub const OpenAt = struct {
+ fd: os.fd_t,
+ path: [*:0]const u8,
+ flags: u32,
+ mode: os.mode_t,
+ result: Error!os.fd_t,
+
+ pub const Error = os.OpenError;
+ };
+
+ pub const Close = struct {
+ fd: os.fd_t,
+ };
+ };
};
};