diff options
Diffstat (limited to 'lib/std/event/loop.zig')
| -rw-r--r-- | lib/std/event/loop.zig | 416 |
1 files changed, 315 insertions, 101 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 13e704a8d3..2ed9f938d8 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -721,6 +721,50 @@ pub const Loop = struct { } } + /// ------- I/0 APIs ------- + pub fn accept( + self: *Loop, + /// This argument is a socket that has been created with `socket`, bound to a local address + /// with `bind`, and is listening for connections after a `listen`. + sockfd: os.fd_t, + /// This argument is a pointer to a sockaddr structure. This structure is filled in with the + /// address of the peer socket, as known to the communications layer. The exact format of the + /// address returned addr is determined by the socket's address family (see `socket` and the + /// respective protocol man pages). + addr: *os.sockaddr, + /// This argument is a value-result argument: the caller must initialize it to contain the + /// size (in bytes) of the structure pointed to by addr; on return it will contain the actual size + /// of the peer address. + /// + /// The returned address is truncated if the buffer provided is too small; in this case, `addr_size` + /// will return a value greater than was supplied to the call. + addr_size: *os.socklen_t, + /// The following values can be bitwise ORed in flags to obtain different behavior: + /// * `SOCK_CLOEXEC` - Set the close-on-exec (`FD_CLOEXEC`) flag on the new file descriptor. See the + /// description of the `O_CLOEXEC` flag in `open` for reasons why this may be useful. + flags: u32, + ) os.AcceptError!os.fd_t { + while (true) { + return os.accept(sockfd, addr, addr_size, flags | os.SOCK_NONBLOCK) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdReadable(sockfd); + continue; + }, + else => return err, + }; + } + } + + pub fn connect(self: *Loop, sockfd: os.socket_t, sock_addr: *const os.sockaddr, len: os.socklen_t) os.ConnectError!void { + os.connect(sockfd, sock_addr, len) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdWritable(sockfd); + return os.getsockoptError(sockfd); + }, + else => return err, + }; + } + /// Performs an async `os.open` using a separate thread. pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: os.mode_t) os.OpenError!os.fd_t { var req_node = Request.Node{ @@ -779,152 +823,309 @@ pub const Loop = struct { /// Performs an async `os.read` using a separate thread. /// `fd` must block and not return EAGAIN. - pub fn read(self: *Loop, fd: os.fd_t, buf: []u8) os.ReadError!usize { - var req_node = Request.Node{ - .data = .{ - .msg = .{ - .read = .{ - .fd = fd, - .buf = buf, - .result = undefined, + pub fn read(self: *Loop, fd: os.fd_t, buf: []u8, simulate_evented: bool) os.ReadError!usize { + if (simulate_evented) { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .read = .{ + .fd = fd, + .buf = buf, + .result = undefined, + }, }, + .finish = .{ .TickNode = .{ .data = @frame() } }, }, - .finish = .{ .TickNode = .{ .data = @frame() } }, - }, - }; - suspend { - self.posixFsRequest(&req_node); + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.read.result; + } else { + while (true) { + return os.read(fd, buf) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdReadable(fd); + continue; + }, + else => return err, + }; + } } - return req_node.data.msg.read.result; } /// Performs an async `os.readv` using a separate thread. /// `fd` must block and not return EAGAIN. - pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec) os.ReadError!usize { - var req_node = Request.Node{ - .data = .{ - .msg = .{ - .readv = .{ - .fd = fd, - .iov = iov, - .result = undefined, + pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, simulate_evented: bool) os.ReadError!usize { + if (simulate_evented) { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .readv = .{ + .fd = fd, + .iov = iov, + .result = undefined, + }, }, + .finish = .{ .TickNode = .{ .data = @frame() } }, }, - .finish = .{ .TickNode = .{ .data = @frame() } }, - }, - }; - suspend { - self.posixFsRequest(&req_node); + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.readv.result; + } else { + while (true) { + return os.readv(fd, iov) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdReadable(fd); + continue; + }, + else => return err, + }; + } } - return req_node.data.msg.readv.result; } /// Performs an async `os.pread` using a separate thread. /// `fd` must block and not return EAGAIN. - pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64) os.PReadError!usize { - var req_node = Request.Node{ - .data = .{ - .msg = .{ - .pread = .{ - .fd = fd, - .buf = buf, - .offset = offset, - .result = undefined, + pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64, simulate_evented: bool) os.PReadError!usize { + if (simulate_evented) { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .pread = .{ + .fd = fd, + .buf = buf, + .offset = offset, + .result = undefined, + }, }, + .finish = .{ .TickNode = .{ .data = @frame() } }, }, - .finish = .{ .TickNode = .{ .data = @frame() } }, - }, - }; - suspend { - self.posixFsRequest(&req_node); + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.pread.result; + } else { + while (true) { + return os.pread(fd, buf, offset) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdReadable(fd); + continue; + }, + else => return err, + }; + } } - return req_node.data.msg.pread.result; } /// Performs an async `os.preadv` using a separate thread. /// `fd` must block and not return EAGAIN. - pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64) os.ReadError!usize { - var req_node = Request.Node{ - .data = .{ - .msg = .{ - .preadv = .{ - .fd = fd, - .iov = iov, - .offset = offset, - .result = undefined, + pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64, simulate_evented: bool) os.ReadError!usize { + if (simulate_evented) { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .preadv = .{ + .fd = fd, + .iov = iov, + .offset = offset, + .result = undefined, + }, }, + .finish = .{ .TickNode = .{ .data = @frame() } }, }, - .finish = .{ .TickNode = .{ .data = @frame() } }, - }, - }; - suspend { - self.posixFsRequest(&req_node); + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.preadv.result; + } else { + while (true) { + return os.preadv(fd, iov, offset) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdReadable(fd); + continue; + }, + else => return err, + }; + } } - return req_node.data.msg.preadv.result; } /// Performs an async `os.write` using a separate thread. /// `fd` must block and not return EAGAIN. - pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8) os.WriteError!usize { - var req_node = Request.Node{ - .data = .{ - .msg = .{ - .write = .{ - .fd = fd, - .bytes = bytes, - .result = undefined, + pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8, simulate_evented: bool) os.WriteError!usize { + if (simulate_evented) { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .write = .{ + .fd = fd, + .bytes = bytes, + .result = undefined, + }, }, + .finish = .{ .TickNode = .{ .data = @frame() } }, }, - .finish = .{ .TickNode = .{ .data = @frame() } }, - }, - }; - suspend { - self.posixFsRequest(&req_node); + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.write.result; + } else { + while (true) { + return os.write(fd, bytes) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdWritable(fd); + continue; + }, + else => return err, + }; + } } - return req_node.data.msg.write.result; } /// Performs an async `os.writev` using a separate thread. /// `fd` must block and not return EAGAIN. - pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const) os.WriteError!usize { - var req_node = Request.Node{ - .data = .{ - .msg = .{ - .writev = .{ - .fd = fd, - .iov = iov, - .result = undefined, + pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, simulate_evented: bool) os.WriteError!usize { + if (simulate_evented) { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .writev = .{ + .fd = fd, + .iov = iov, + .result = undefined, + }, }, + .finish = .{ .TickNode = .{ .data = @frame() } }, }, - .finish = .{ .TickNode = .{ .data = @frame() } }, - }, - }; - suspend { - self.posixFsRequest(&req_node); + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.writev.result; + } else { + while (true) { + return os.writev(fd, iov) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdWritable(fd); + continue; + }, + else => return err, + }; + } + } + } + + /// Performs an async `os.pwrite` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn pwrite(self: *Loop, fd: os.fd_t, bytes: []const u8, offset: u64, simulate_evented: bool) os.PerformsWriteError!usize { + if (simulate_evented) { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .pwrite = .{ + .fd = fd, + .bytes = bytes, + .offset = offset, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.pwrite.result; + } else { + while (true) { + return os.pwrite(fd, bytes, offset) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdWritable(fd); + continue; + }, + else => return err, + }; + } } - return req_node.data.msg.writev.result; } /// Performs an async `os.pwritev` using a separate thread. /// `fd` must block and not return EAGAIN. - pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64) os.WriteError!usize { - var req_node = Request.Node{ - .data = .{ - .msg = .{ - .pwritev = .{ - .fd = fd, - .iov = iov, - .offset = offset, - .result = undefined, + pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64, simulate_evented: bool) os.PWriteError!usize { + if (simulate_evented) { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .pwritev = .{ + .fd = fd, + .iov = iov, + .offset = offset, + .result = undefined, + }, }, + .finish = .{ .TickNode = .{ .data = @frame() } }, }, - .finish = .{ .TickNode = .{ .data = @frame() } }, - }, - }; - suspend { - self.posixFsRequest(&req_node); + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.pwritev.result; + } else { + while (true) { + return os.pwritev(fd, iov, offset) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdWritable(fd); + continue; + }, + else => return err, + }; + } + } + } + + pub fn sendto( + self: *Loop, + /// The file descriptor of the sending socket. + sockfd: os.fd_t, + /// Message to send. + buf: []const u8, + flags: u32, + dest_addr: ?*const os.sockaddr, + addrlen: os.socklen_t, + ) os.SendError!usize { + while (true) { + return os.sendto(sockfd, buf, flags, dest_addr, addrlen) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdWritable(sockfd); + continue; + }, + else => return err, + }; + } + } + + pub fn recvfrom( + sockfd: os.fd_t, + buf: []u8, + flags: u32, + src_addr: ?*os.sockaddr, + addrlen: ?*os.socklen_t, + ) os.RecvFromError!usize { + while (true) { + return os.recvfrom(sockfd, buf, flags, src_addr, addrlen) catch |err| switch (err) { + error.WouldBlock => { + self.waitUntilFdReadable(sockfd); + continue; + }, + else => return err, + }; } - return req_node.data.msg.pwritev.result; } /// Performs an async `os.faccessatZ` using a separate thread. @@ -1079,6 +1280,9 @@ pub const Loop = struct { .writev => |*msg| { msg.result = os.writev(msg.fd, msg.iov); }, + .pwrite => |*msg| { + msg.result = os.pwrite(msg.fd, msg.bytes, msg.offset); + }, .pwritev => |*msg| { msg.result = os.pwritev(msg.fd, msg.iov, msg.offset); }, @@ -1148,6 +1352,7 @@ pub const Loop = struct { readv: ReadV, write: Write, writev: WriteV, + pwrite: PWrite, pwritev: PWriteV, pread: PRead, preadv: PReadV, @@ -1191,6 +1396,15 @@ pub const Loop = struct { pub const Error = os.WriteError; }; + pub const PWrite = struct { + fd: os.fd_t, + bytes: []const u8, + offset: usize, + result: Error!usize, + + pub const Error = os.PWriteError; + }; + pub const PWriteV = struct { fd: os.fd_t, iov: []const os.iovec_const, |
