aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event/loop.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2020-09-29 00:27:48 -0700
committerAndrew Kelley <andrew@ziglang.org>2020-09-29 00:27:48 -0700
commit750b00c642782127736eb378ec9583db2d680bb6 (patch)
tree095ef980bdd7cdaeffaa2277be775b2cb1e4a2f2 /lib/std/event/loop.zig
parentfa6d150441d1d8679a77c5e9a6071fa952851376 (diff)
parenta0c0f9ead53dab4f558dbb51cc8a49961fc6984f (diff)
downloadzig-750b00c642782127736eb378ec9583db2d680bb6.tar.gz
zig-750b00c642782127736eb378ec9583db2d680bb6.zip
Merge remote-tracking branch 'origin/master' into stage2-zig-cc
Diffstat (limited to 'lib/std/event/loop.zig')
-rw-r--r--lib/std/event/loop.zig432
1 files changed, 326 insertions, 106 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
index 2600b337b3..2ed9f938d8 100644
--- a/lib/std/event/loop.zig
+++ b/lib/std/event/loop.zig
@@ -112,8 +112,9 @@ pub const Loop = struct {
/// have the correct pointer value.
/// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
pub fn init(self: *Loop) !void {
- if (builtin.single_threaded
- or (@hasDecl(root, "event_loop_mode") and root.event_loop_mode == .single_threaded)) {
+ if (builtin.single_threaded or
+ (@hasDecl(root, "event_loop_mode") and root.event_loop_mode == .single_threaded))
+ {
return self.initSingleThreaded();
} else {
return self.initMultiThreaded();
@@ -687,9 +688,14 @@ pub const Loop = struct {
switch (builtin.os.tag) {
.linux => {
- // writing 8 bytes to an eventfd cannot fail
- const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
- assert(amt == wakeup_bytes.len);
+ // writing to the eventfd will only wake up one thread, thus multiple writes
+ // are needed to wakeup all the threads
+ var i: usize = 0;
+ while (i < self.extra_threads.len + 1) : (i += 1) {
+ // writing 8 bytes to an eventfd cannot fail
+ const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
+ assert(amt == wakeup_bytes.len);
+ }
return;
},
.macosx, .freebsd, .netbsd, .dragonfly => {
@@ -715,6 +721,50 @@ pub const Loop = struct {
}
}
+ /// ------- I/0 APIs -------
+ pub fn accept(
+ self: *Loop,
+ /// This argument is a socket that has been created with `socket`, bound to a local address
+ /// with `bind`, and is listening for connections after a `listen`.
+ sockfd: os.fd_t,
+ /// This argument is a pointer to a sockaddr structure. This structure is filled in with the
+ /// address of the peer socket, as known to the communications layer. The exact format of the
+ /// address returned addr is determined by the socket's address family (see `socket` and the
+ /// respective protocol man pages).
+ addr: *os.sockaddr,
+ /// This argument is a value-result argument: the caller must initialize it to contain the
+ /// size (in bytes) of the structure pointed to by addr; on return it will contain the actual size
+ /// of the peer address.
+ ///
+ /// The returned address is truncated if the buffer provided is too small; in this case, `addr_size`
+ /// will return a value greater than was supplied to the call.
+ addr_size: *os.socklen_t,
+ /// The following values can be bitwise ORed in flags to obtain different behavior:
+ /// * `SOCK_CLOEXEC` - Set the close-on-exec (`FD_CLOEXEC`) flag on the new file descriptor. See the
+ /// description of the `O_CLOEXEC` flag in `open` for reasons why this may be useful.
+ flags: u32,
+ ) os.AcceptError!os.fd_t {
+ while (true) {
+ return os.accept(sockfd, addr, addr_size, flags | os.SOCK_NONBLOCK) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdReadable(sockfd);
+ continue;
+ },
+ else => return err,
+ };
+ }
+ }
+
+ pub fn connect(self: *Loop, sockfd: os.socket_t, sock_addr: *const os.sockaddr, len: os.socklen_t) os.ConnectError!void {
+ os.connect(sockfd, sock_addr, len) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdWritable(sockfd);
+ return os.getsockoptError(sockfd);
+ },
+ else => return err,
+ };
+ }
+
/// Performs an async `os.open` using a separate thread.
pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: os.mode_t) os.OpenError!os.fd_t {
var req_node = Request.Node{
@@ -773,152 +823,309 @@ pub const Loop = struct {
/// Performs an async `os.read` using a separate thread.
/// `fd` must block and not return EAGAIN.
- pub fn read(self: *Loop, fd: os.fd_t, buf: []u8) os.ReadError!usize {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .read = .{
- .fd = fd,
- .buf = buf,
- .result = undefined,
+ pub fn read(self: *Loop, fd: os.fd_t, buf: []u8, simulate_evented: bool) os.ReadError!usize {
+ if (simulate_evented) {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .read = .{
+ .fd = fd,
+ .buf = buf,
+ .result = undefined,
+ },
},
+ .finish = .{ .TickNode = .{ .data = @frame() } },
},
- .finish = .{ .TickNode = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.read.result;
+ } else {
+ while (true) {
+ return os.read(fd, buf) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdReadable(fd);
+ continue;
+ },
+ else => return err,
+ };
+ }
}
- return req_node.data.msg.read.result;
}
/// Performs an async `os.readv` using a separate thread.
/// `fd` must block and not return EAGAIN.
- pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec) os.ReadError!usize {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .readv = .{
- .fd = fd,
- .iov = iov,
- .result = undefined,
+ pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, simulate_evented: bool) os.ReadError!usize {
+ if (simulate_evented) {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .readv = .{
+ .fd = fd,
+ .iov = iov,
+ .result = undefined,
+ },
},
+ .finish = .{ .TickNode = .{ .data = @frame() } },
},
- .finish = .{ .TickNode = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.readv.result;
+ } else {
+ while (true) {
+ return os.readv(fd, iov) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdReadable(fd);
+ continue;
+ },
+ else => return err,
+ };
+ }
}
- return req_node.data.msg.readv.result;
}
/// Performs an async `os.pread` using a separate thread.
/// `fd` must block and not return EAGAIN.
- pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64) os.PReadError!usize {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .pread = .{
- .fd = fd,
- .buf = buf,
- .offset = offset,
- .result = undefined,
+ pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64, simulate_evented: bool) os.PReadError!usize {
+ if (simulate_evented) {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .pread = .{
+ .fd = fd,
+ .buf = buf,
+ .offset = offset,
+ .result = undefined,
+ },
},
+ .finish = .{ .TickNode = .{ .data = @frame() } },
},
- .finish = .{ .TickNode = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.pread.result;
+ } else {
+ while (true) {
+ return os.pread(fd, buf, offset) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdReadable(fd);
+ continue;
+ },
+ else => return err,
+ };
+ }
}
- return req_node.data.msg.pread.result;
}
/// Performs an async `os.preadv` using a separate thread.
/// `fd` must block and not return EAGAIN.
- pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64) os.ReadError!usize {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .preadv = .{
- .fd = fd,
- .iov = iov,
- .offset = offset,
- .result = undefined,
+ pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64, simulate_evented: bool) os.ReadError!usize {
+ if (simulate_evented) {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .preadv = .{
+ .fd = fd,
+ .iov = iov,
+ .offset = offset,
+ .result = undefined,
+ },
},
+ .finish = .{ .TickNode = .{ .data = @frame() } },
},
- .finish = .{ .TickNode = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.preadv.result;
+ } else {
+ while (true) {
+ return os.preadv(fd, iov, offset) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdReadable(fd);
+ continue;
+ },
+ else => return err,
+ };
+ }
}
- return req_node.data.msg.preadv.result;
}
/// Performs an async `os.write` using a separate thread.
/// `fd` must block and not return EAGAIN.
- pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8) os.WriteError!usize {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .write = .{
- .fd = fd,
- .bytes = bytes,
- .result = undefined,
+ pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8, simulate_evented: bool) os.WriteError!usize {
+ if (simulate_evented) {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .write = .{
+ .fd = fd,
+ .bytes = bytes,
+ .result = undefined,
+ },
},
+ .finish = .{ .TickNode = .{ .data = @frame() } },
},
- .finish = .{ .TickNode = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.write.result;
+ } else {
+ while (true) {
+ return os.write(fd, bytes) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdWritable(fd);
+ continue;
+ },
+ else => return err,
+ };
+ }
}
- return req_node.data.msg.write.result;
}
/// Performs an async `os.writev` using a separate thread.
/// `fd` must block and not return EAGAIN.
- pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const) os.WriteError!usize {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .writev = .{
- .fd = fd,
- .iov = iov,
- .result = undefined,
+ pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, simulate_evented: bool) os.WriteError!usize {
+ if (simulate_evented) {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .writev = .{
+ .fd = fd,
+ .iov = iov,
+ .result = undefined,
+ },
},
+ .finish = .{ .TickNode = .{ .data = @frame() } },
},
- .finish = .{ .TickNode = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.writev.result;
+ } else {
+ while (true) {
+ return os.writev(fd, iov) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdWritable(fd);
+ continue;
+ },
+ else => return err,
+ };
+ }
+ }
+ }
+
+ /// Performs an async `os.pwrite` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn pwrite(self: *Loop, fd: os.fd_t, bytes: []const u8, offset: u64, simulate_evented: bool) os.PerformsWriteError!usize {
+ if (simulate_evented) {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .pwrite = .{
+ .fd = fd,
+ .bytes = bytes,
+ .offset = offset,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.pwrite.result;
+ } else {
+ while (true) {
+ return os.pwrite(fd, bytes, offset) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdWritable(fd);
+ continue;
+ },
+ else => return err,
+ };
+ }
}
- return req_node.data.msg.writev.result;
}
/// Performs an async `os.pwritev` using a separate thread.
/// `fd` must block and not return EAGAIN.
- pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64) os.WriteError!usize {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .pwritev = .{
- .fd = fd,
- .iov = iov,
- .offset = offset,
- .result = undefined,
+ pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64, simulate_evented: bool) os.PWriteError!usize {
+ if (simulate_evented) {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .pwritev = .{
+ .fd = fd,
+ .iov = iov,
+ .offset = offset,
+ .result = undefined,
+ },
},
+ .finish = .{ .TickNode = .{ .data = @frame() } },
},
- .finish = .{ .TickNode = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.pwritev.result;
+ } else {
+ while (true) {
+ return os.pwritev(fd, iov, offset) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdWritable(fd);
+ continue;
+ },
+ else => return err,
+ };
+ }
+ }
+ }
+
+ pub fn sendto(
+ self: *Loop,
+ /// The file descriptor of the sending socket.
+ sockfd: os.fd_t,
+ /// Message to send.
+ buf: []const u8,
+ flags: u32,
+ dest_addr: ?*const os.sockaddr,
+ addrlen: os.socklen_t,
+ ) os.SendError!usize {
+ while (true) {
+ return os.sendto(sockfd, buf, flags, dest_addr, addrlen) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdWritable(sockfd);
+ continue;
+ },
+ else => return err,
+ };
+ }
+ }
+
+ pub fn recvfrom(
+ sockfd: os.fd_t,
+ buf: []u8,
+ flags: u32,
+ src_addr: ?*os.sockaddr,
+ addrlen: ?*os.socklen_t,
+ ) os.RecvFromError!usize {
+ while (true) {
+ return os.recvfrom(sockfd, buf, flags, src_addr, addrlen) catch |err| switch (err) {
+ error.WouldBlock => {
+ self.waitUntilFdReadable(sockfd);
+ continue;
+ },
+ else => return err,
+ };
}
- return req_node.data.msg.pwritev.result;
}
/// Performs an async `os.faccessatZ` using a separate thread.
@@ -1073,6 +1280,9 @@ pub const Loop = struct {
.writev => |*msg| {
msg.result = os.writev(msg.fd, msg.iov);
},
+ .pwrite => |*msg| {
+ msg.result = os.pwrite(msg.fd, msg.bytes, msg.offset);
+ },
.pwritev => |*msg| {
msg.result = os.pwritev(msg.fd, msg.iov, msg.offset);
},
@@ -1142,6 +1352,7 @@ pub const Loop = struct {
readv: ReadV,
write: Write,
writev: WriteV,
+ pwrite: PWrite,
pwritev: PWriteV,
pread: PRead,
preadv: PReadV,
@@ -1185,6 +1396,15 @@ pub const Loop = struct {
pub const Error = os.WriteError;
};
+ pub const PWrite = struct {
+ fd: os.fd_t,
+ bytes: []const u8,
+ offset: usize,
+ result: Error!usize,
+
+ pub const Error = os.PWriteError;
+ };
+
pub const PWriteV = struct {
fd: os.fd_t,
iov: []const os.iovec_const,