diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2019-10-29 22:59:30 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2019-10-29 22:59:30 -0400 |
| commit | c3d816a98e1126f5de4ec1a45e5f65bb2ff2f43c (patch) | |
| tree | 5930edec7fa411286b2f2a9e36183b01589a6a6a /lib/std/event | |
| parent | 8d3b7689ad9c2dd14d0f5cadf2b711ff1ab70054 (diff) | |
| download | zig-c3d816a98e1126f5de4ec1a45e5f65bb2ff2f43c.tar.gz zig-c3d816a98e1126f5de4ec1a45e5f65bb2ff2f43c.zip | |
std lib networking improvements, especially non-blocking I/O
* delete the std/event/net directory
* `std.event.Loop.waitUntilFdReadable` and related functions
no longer have possibility of failure. On Linux, they fall
back to poll() and then fall back to sleep().
* add some missing `noasync` decorations in `std.event.Loop`
* redo the `std.net.Server` API. it's quite nice now, but
shutdown does not work cleanly. There is a race condition with
close() that I am actively working on.
* move `std.io.OutStream` to its own file to match `std.io.InStream`.
I started working on making `write` integrated with evented I/O,
but it got tricky so I backed off and filed #3557. However
I did integrate `std.os.writev` and `std.os.pwritev` with evented I/O.
* add `std.Target.stack_align`
* move networking tests to `lib/std/net/test.zig`
* add `std.net.tcpConnectToHost` and `std.net.tcpConnectToAddress`.
* rename `error.UnknownName` to `error.UnknownHostName` within the
context of DNS resolution.
* add `std.os.readv`, which is integrated with evented I/O.
* `std.os.preadv`, is now integrated with evented I/O.
* `std.os.accept4` now asserts that ENOTSOCK and EOPNOTSUPP never
occur (misuse of API), instead of returning errors.
* `std.os.connect` is now integrated with evented I/O.
`std.os.connect_async` is gone. Just use `std.os.connect`.
* fix false positive dependency loop regarding async function frames
* add more compile notes to help when dependency loops occur
in determining whether a function is async.
* ir: change an assert to ir_assert to make it easier to find
workarounds for when such an assert is triggered. In this case
it was trying to parse an IPv4 address at comptime.
Diffstat (limited to 'lib/std/event')
| -rw-r--r-- | lib/std/event/channel.zig | 10 | ||||
| -rw-r--r-- | lib/std/event/loop.zig | 87 | ||||
| -rw-r--r-- | lib/std/event/net.zig | 358 |
3 files changed, 71 insertions, 384 deletions
diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig index 1092f2204a..c894e6c0e8 100644 --- a/lib/std/event/channel.zig +++ b/lib/std/event/channel.zig @@ -4,9 +4,11 @@ const assert = std.debug.assert; const testing = std.testing; const Loop = std.event.Loop; -/// many producer, many consumer, thread-safe, runtime configurable buffer size -/// when buffer is empty, consumers suspend and are resumed by producers -/// when buffer is full, producers suspend and are resumed by consumers +/// Many producer, many consumer, thread-safe, runtime configurable buffer size. +/// When buffer is empty, consumers suspend and are resumed by producers. +/// When buffer is full, producers suspend and are resumed by consumers. +/// TODO now that async function rewrite has landed, this API should be adjusted +/// to not use the event loop's allocator, and to not require allocation. pub fn Channel(comptime T: type) type { return struct { loop: *Loop, @@ -48,7 +50,7 @@ pub fn Channel(comptime T: type) type { tick_node: *Loop.NextTickNode, }; - /// call destroy when done + /// Call `destroy` when done. pub fn create(loop: *Loop, capacity: usize) !*SelfChannel { const buffer_nodes = try loop.allocator.alloc(T, capacity); errdefer loop.allocator.free(buffer_nodes); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index cec722985b..b0b42fe268 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -448,26 +448,67 @@ pub const Loop = struct { self.finishOneEvent(); } - pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void { - defer self.linuxRemoveFd(fd); + pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) void { + assert(flags & os.EPOLLET == os.EPOLLET); + assert(flags & os.EPOLLONESHOT == os.EPOLLONESHOT); + var resume_node = ResumeNode.Basic{ + .base = ResumeNode{ + .id = .Basic, + .handle = @frame(), + .overlapped = ResumeNode.overlapped_init, + }, + }; + var need_to_delete = false; + defer if (need_to_delete) self.linuxRemoveFd(fd); + suspend { - var resume_node = ResumeNode.Basic{ - .base = ResumeNode{ - .id = .Basic, - .handle = @frame(), - .overlapped = ResumeNode.overlapped_init, + if (self.linuxAddFd(fd, &resume_node.base, flags)) |_| { + need_to_delete = true; + } else |err| switch (err) { + error.FileDescriptorNotRegistered => unreachable, + error.OperationCausesCircularLoop => unreachable, + error.FileDescriptorIncompatibleWithEpoll => unreachable, + error.FileDescriptorAlreadyPresentInSet => unreachable, // evented writes to the same fd is not thread-safe + + error.SystemResources, + error.UserResourceLimitReached, + error.Unexpected, + => { + // Fall back to a blocking poll(). Ideally this codepath is never hit, since + // epoll should be just fine. But this is better than incorrect behavior. + var poll_flags: i16 = 0; + if ((flags & os.EPOLLIN) != 0) poll_flags |= os.POLLIN; + if ((flags & os.EPOLLOUT) != 0) poll_flags |= os.POLLOUT; + var pfd = [1]os.pollfd{os.pollfd{ + .fd = fd, + .events = poll_flags, + .revents = undefined, + }}; + _ = os.poll(&pfd, -1) catch |poll_err| switch (poll_err) { + error.SystemResources, + error.Unexpected, + => { + // Even poll() didn't work. The best we can do now is sleep for a + // small duration and then hope that something changed. + std.time.sleep(1 * std.time.millisecond); + }, + }; + resume @frame(); }, - }; - try self.linuxAddFd(fd, &resume_node.base, flags); + } } } - pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) !void { - return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); + pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLIN); } - pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) !void { - return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT); + pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT); + } + + pub fn waitUntilFdWritableOrReadable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT | os.EPOLLIN); } pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent { @@ -645,7 +686,7 @@ pub const Loop = struct { .linux => { self.posixFsRequest(&self.os_data.fs_end_request); // writing 8 bytes to an eventfd cannot fail - os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + noasync os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; return; }, .macosx, .freebsd, .netbsd => { @@ -793,6 +834,8 @@ pub const Loop = struct { } } + // TODO make this whole function noasync + // https://github.com/ziglang/zig/issues/3157 fn posixFsRun(self: *Loop) void { while (true) { if (builtin.os == .linux) { @@ -802,27 +845,27 @@ pub const Loop = struct { switch (node.data.msg) { .End => return, .WriteV => |*msg| { - msg.result = os.writev(msg.fd, msg.iov); + msg.result = noasync os.writev(msg.fd, msg.iov); }, .PWriteV => |*msg| { - msg.result = os.pwritev(msg.fd, msg.iov, msg.offset); + msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset); }, .PReadV => |*msg| { - msg.result = os.preadv(msg.fd, msg.iov, msg.offset); + msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset); }, .Open => |*msg| { - msg.result = os.openC(msg.path.ptr, msg.flags, msg.mode); + msg.result = noasync os.openC(msg.path.ptr, msg.flags, msg.mode); }, - .Close => |*msg| os.close(msg.fd), + .Close => |*msg| noasync os.close(msg.fd), .WriteFile => |*msg| blk: { const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC; - const fd = os.openC(msg.path.ptr, flags, msg.mode) catch |err| { + const fd = noasync os.openC(msg.path.ptr, flags, msg.mode) catch |err| { msg.result = err; break :blk; }; - defer os.close(fd); - msg.result = os.write(fd, msg.contents); + defer noasync os.close(fd); + msg.result = noasync os.write(fd, msg.contents); }, } switch (node.data.finish) { diff --git a/lib/std/event/net.zig b/lib/std/event/net.zig deleted file mode 100644 index bed665dcdc..0000000000 --- a/lib/std/event/net.zig +++ /dev/null @@ -1,358 +0,0 @@ -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; -const event = std.event; -const mem = std.mem; -const os = std.os; -const Loop = std.event.Loop; -const File = std.fs.File; -const fd_t = os.fd_t; - -pub const Server = struct { - handleRequestFn: async fn (*Server, *const std.net.Address, File) void, - - loop: *Loop, - sockfd: ?i32, - accept_frame: ?anyframe, - listen_address: std.net.Address, - - waiting_for_emfile_node: PromiseNode, - listen_resume_node: event.Loop.ResumeNode, - - const PromiseNode = std.TailQueue(anyframe).Node; - - pub fn init(loop: *Loop) Server { - // TODO can't initialize handler here because we need well defined copy elision - return Server{ - .loop = loop, - .sockfd = null, - .accept_frame = null, - .handleRequestFn = undefined, - .waiting_for_emfile_node = undefined, - .listen_address = undefined, - .listen_resume_node = event.Loop.ResumeNode{ - .id = event.Loop.ResumeNode.Id.Basic, - .handle = undefined, - .overlapped = event.Loop.ResumeNode.overlapped_init, - }, - }; - } - - pub fn listen( - self: *Server, - address: *const std.net.Address, - handleRequestFn: async fn (*Server, *const std.net.Address, File) void, - ) !void { - self.handleRequestFn = handleRequestFn; - - const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); - errdefer os.close(sockfd); - self.sockfd = sockfd; - - try os.bind(sockfd, &address.os_addr); - try os.listen(sockfd, os.SOMAXCONN); - self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd)); - - self.accept_frame = async Server.handler(self); - errdefer await self.accept_frame.?; - - self.listen_resume_node.handle = self.accept_frame.?; - try self.loop.linuxAddFd(sockfd, &self.listen_resume_node, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - errdefer self.loop.removeFd(sockfd); - } - - /// Stop listening - pub fn close(self: *Server) void { - self.loop.linuxRemoveFd(self.sockfd.?); - if (self.sockfd) |fd| { - os.close(fd); - self.sockfd = null; - } - } - - pub fn deinit(self: *Server) void { - if (self.accept_frame) |accept_frame| await accept_frame; - if (self.sockfd) |sockfd| os.close(sockfd); - } - - pub async fn handler(self: *Server) void { - while (true) { - var accepted_addr: std.net.Address = undefined; - // TODO just inline the following function here and don't expose it as posixAsyncAccept - if (os.accept4_async(self.sockfd.?, &accepted_addr.os_addr, os.SOCK_NONBLOCK | os.SOCK_CLOEXEC)) |accepted_fd| { - if (accepted_fd == -1) { - // would block - suspend; // we will get resumed by epoll_wait in the event loop - continue; - } - var socket = File.openHandle(accepted_fd); - self.handleRequestFn(self, &accepted_addr, socket); - } else |err| switch (err) { - error.ProcessFdQuotaExceeded => @panic("TODO handle this error"), - error.ConnectionAborted => continue, - - error.FileDescriptorNotASocket => unreachable, - error.OperationNotSupported => unreachable, - - error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => { - @panic("TODO handle this error"); - }, - } - } - } -}; - -pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 { - const sockfd = try os.socket( - os.AF_UNIX, - os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, - 0, - ); - errdefer os.close(sockfd); - - var sock_addr = os.sockaddr_un{ - .family = os.AF_UNIX, - .path = undefined, - }; - - if (path.len > @typeOf(sock_addr.path).len) return error.NameTooLong; - mem.copy(u8, sock_addr.path[0..], path); - const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len); - try os.connect_async(sockfd, &sock_addr, size); - try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - try os.getsockoptError(sockfd); - - return sockfd; -} - -pub const ReadError = error{ - SystemResources, - Unexpected, - UserResourceLimitReached, - InputOutput, - - FileDescriptorNotRegistered, // TODO remove this possibility - OperationCausesCircularLoop, // TODO remove this possibility - FileDescriptorAlreadyPresentInSet, // TODO remove this possibility - FileDescriptorIncompatibleWithEpoll, // TODO remove this possibility -}; - -/// returns number of bytes read. 0 means EOF. -pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize { - const iov = os.iovec{ - .iov_base = buffer.ptr, - .iov_len = buffer.len, - }; - const iovs: *const [1]os.iovec = &iov; - return readvPosix(loop, fd, iovs, 1); -} - -pub const WriteError = error{}; - -pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteError!void { - const iov = os.iovec_const{ - .iov_base = buffer.ptr, - .iov_len = buffer.len, - }; - const iovs: *const [1]os.iovec_const = &iov; - return writevPosix(loop, fd, iovs, 1); -} - -pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void { - while (true) { - switch (builtin.os) { - .macosx, .linux => { - switch (os.errno(os.system.writev(fd, iov, count))) { - 0 => return, - os.EINTR => continue, - os.ESPIPE => unreachable, - os.EINVAL => unreachable, - os.EFAULT => unreachable, - os.EAGAIN => { - try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT); - continue; - }, - os.EBADF => unreachable, // always a race condition - os.EDESTADDRREQ => unreachable, // connect was never called - os.EDQUOT => unreachable, - os.EFBIG => unreachable, - os.EIO => return error.InputOutput, - os.ENOSPC => unreachable, - os.EPERM => return error.AccessDenied, - os.EPIPE => unreachable, - else => |err| return os.unexpectedErrno(err), - } - }, - else => @compileError("Unsupported OS"), - } - } -} - -/// returns number of bytes read. 0 means EOF. -pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count: usize) !usize { - while (true) { - switch (builtin.os) { - builtin.Os.linux, builtin.Os.freebsd, builtin.Os.macosx => { - const rc = os.system.readv(fd, iov, count); - switch (os.errno(rc)) { - 0 => return rc, - os.EINTR => continue, - os.EINVAL => unreachable, - os.EFAULT => unreachable, - os.EAGAIN => { - try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); - continue; - }, - os.EBADF => unreachable, // always a race condition - os.EIO => return error.InputOutput, - os.EISDIR => unreachable, - os.ENOBUFS => return error.SystemResources, - os.ENOMEM => return error.SystemResources, - else => |err| return os.unexpectedErrno(err), - } - }, - else => @compileError("Unsupported OS"), - } - } -} - -pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void { - const iovecs = try loop.allocator.alloc(os.iovec_const, data.len); - defer loop.allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec_const{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return writevPosix(loop, fd, iovecs.ptr, data.len); -} - -pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize { - const iovecs = try loop.allocator.alloc(os.iovec, data.len); - defer loop.allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return readvPosix(loop, fd, iovecs.ptr, data.len); -} - -pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File { - var address = _address.*; // TODO https://github.com/ziglang/zig/issues/1592 - - const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); - errdefer os.close(sockfd); - - try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in)); - try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - try os.getsockoptError(sockfd); - - return File.openHandle(sockfd); -} - -test "listen on a port, send bytes, receive bytes" { - // https://github.com/ziglang/zig/issues/2377 - if (true) return error.SkipZigTest; - - if (builtin.os != builtin.Os.linux) { - // TODO build abstractions for other operating systems - return error.SkipZigTest; - } - - const MyServer = struct { - tcp_server: Server, - - const Self = @This(); - async fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void { - const self = @fieldParentPtr(Self, "tcp_server", tcp_server); - var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 - defer socket.close(); - const next_handler = errorableHandler(self, _addr, socket) catch |err| { - std.debug.panic("unable to handle connection: {}\n", err); - }; - } - async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void { - const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/1592 - var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 - - const stream = &socket.outStream().stream; - try stream.print("hello from server\n"); - } - }; - - const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable; - const addr = std.net.Address.initIp4(ip4addr, 0); - - var loop: Loop = undefined; - try loop.initSingleThreaded(std.debug.global_allocator); - var server = MyServer{ .tcp_server = Server.init(&loop) }; - defer server.tcp_server.deinit(); - try server.tcp_server.listen(&addr, MyServer.handler); - - _ = async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server); - loop.run(); -} - -async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void { - errdefer @panic("test failure"); - - var socket_file = try connect(loop, address); - defer socket_file.close(); - - var buf: [512]u8 = undefined; - const amt_read = try socket_file.read(buf[0..]); - const msg = buf[0..amt_read]; - testing.expect(mem.eql(u8, msg, "hello from server\n")); - server.close(); -} - -pub const OutStream = struct { - fd: fd_t, - stream: Stream, - loop: *Loop, - - pub const Error = WriteError; - pub const Stream = event.io.OutStream(Error); - - pub fn init(loop: *Loop, fd: fd_t) OutStream { - return OutStream{ - .fd = fd, - .loop = loop, - .stream = Stream{ .writeFn = writeFn }, - }; - } - - async fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { - const self = @fieldParentPtr(OutStream, "stream", out_stream); - return write(self.loop, self.fd, bytes); - } -}; - -pub const InStream = struct { - fd: fd_t, - stream: Stream, - loop: *Loop, - - pub const Error = ReadError; - pub const Stream = event.io.InStream(Error); - - pub fn init(loop: *Loop, fd: fd_t) InStream { - return InStream{ - .fd = fd, - .loop = loop, - .stream = Stream{ .readFn = readFn }, - }; - } - - async fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { - const self = @fieldParentPtr(InStream, "stream", in_stream); - return read(self.loop, self.fd, bytes); - } -}; |
