diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2019-10-30 21:30:16 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2019-10-30 21:30:16 -0400 |
| commit | 61d5a0bf48d034208aea37d72dac5b3531334be7 (patch) | |
| tree | 57e545a972ae44c3bc6bab98396f9cb22203edd1 /lib/std/event | |
| parent | 6a15e8a7a771bcbf2534cceecd77231344aafbf8 (diff) | |
| parent | 7b7ba51642c832c77ec2668491843be3b0114124 (diff) | |
| download | zig-61d5a0bf48d034208aea37d72dac5b3531334be7.tar.gz zig-61d5a0bf48d034208aea37d72dac5b3531334be7.zip | |
Merge branch 'std.net'
Diffstat (limited to 'lib/std/event')
| -rw-r--r-- | lib/std/event/channel.zig | 10 | ||||
| -rw-r--r-- | lib/std/event/loop.zig | 87 | ||||
| -rw-r--r-- | lib/std/event/net.zig | 358 |
3 files changed, 73 insertions, 382 deletions
diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig index f8539d5259..88edc90f16 100644 --- a/lib/std/event/channel.zig +++ b/lib/std/event/channel.zig @@ -4,9 +4,11 @@ const assert = std.debug.assert; const testing = std.testing; const Loop = std.event.Loop; -/// many producer, many consumer, thread-safe, runtime configurable buffer size -/// when buffer is empty, consumers suspend and are resumed by producers -/// when buffer is full, producers suspend and are resumed by consumers +/// Many producer, many consumer, thread-safe, runtime configurable buffer size. +/// When buffer is empty, consumers suspend and are resumed by producers. +/// When buffer is full, producers suspend and are resumed by consumers. +/// TODO now that async function rewrite has landed, this API should be adjusted +/// to not use the event loop's allocator, and to not require allocation. pub fn Channel(comptime T: type) type { return struct { loop: *Loop, @@ -48,7 +50,7 @@ pub fn Channel(comptime T: type) type { tick_node: *Loop.NextTickNode, }; - /// call destroy when done + /// Call `destroy` when done. pub fn create(loop: *Loop, capacity: usize) !*SelfChannel { const buffer_nodes = try loop.allocator.alloc(T, capacity); errdefer loop.allocator.free(buffer_nodes); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 22013edba6..ae8d76676d 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -448,22 +448,67 @@ pub const Loop = struct { self.finishOneEvent(); } - pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void { - defer self.linuxRemoveFd(fd); + pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) void { + assert(flags & os.EPOLLET == os.EPOLLET); + assert(flags & os.EPOLLONESHOT == os.EPOLLONESHOT); + var resume_node = ResumeNode.Basic{ + .base = ResumeNode{ + .id = .Basic, + .handle = @frame(), + .overlapped = ResumeNode.overlapped_init, + }, + }; + var need_to_delete = false; + defer if (need_to_delete) self.linuxRemoveFd(fd); + suspend { - var resume_node = ResumeNode.Basic{ - .base = ResumeNode{ - .id = .Basic, - .handle = @frame(), - .overlapped = ResumeNode.overlapped_init, + if (self.linuxAddFd(fd, &resume_node.base, flags)) |_| { + need_to_delete = true; + } else |err| switch (err) { + error.FileDescriptorNotRegistered => unreachable, + error.OperationCausesCircularLoop => unreachable, + error.FileDescriptorIncompatibleWithEpoll => unreachable, + error.FileDescriptorAlreadyPresentInSet => unreachable, // evented writes to the same fd is not thread-safe + + error.SystemResources, + error.UserResourceLimitReached, + error.Unexpected, + => { + // Fall back to a blocking poll(). Ideally this codepath is never hit, since + // epoll should be just fine. But this is better than incorrect behavior. + var poll_flags: i16 = 0; + if ((flags & os.EPOLLIN) != 0) poll_flags |= os.POLLIN; + if ((flags & os.EPOLLOUT) != 0) poll_flags |= os.POLLOUT; + var pfd = [1]os.pollfd{os.pollfd{ + .fd = fd, + .events = poll_flags, + .revents = undefined, + }}; + _ = os.poll(&pfd, -1) catch |poll_err| switch (poll_err) { + error.SystemResources, + error.Unexpected, + => { + // Even poll() didn't work. The best we can do now is sleep for a + // small duration and then hope that something changed. + std.time.sleep(1 * std.time.millisecond); + }, + }; + resume @frame(); }, - }; - try self.linuxAddFd(fd, &resume_node.base, flags); + } } } - pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) !void { - return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); + pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLIN); + } + + pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT); + } + + pub fn waitUntilFdWritableOrReadable(self: *Loop, fd: os.fd_t) void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT | os.EPOLLIN); } pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent { @@ -642,7 +687,7 @@ pub const Loop = struct { .linux => { self.posixFsRequest(&self.os_data.fs_end_request); // writing 8 bytes to an eventfd cannot fail - os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + noasync os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; return; }, .macosx, .freebsd, .netbsd, .dragonfly => { @@ -790,6 +835,8 @@ pub const Loop = struct { } } + // TODO make this whole function noasync + // https://github.com/ziglang/zig/issues/3157 fn posixFsRun(self: *Loop) void { while (true) { if (builtin.os == .linux) { @@ -799,27 +846,27 @@ pub const Loop = struct { switch (node.data.msg) { .End => return, .WriteV => |*msg| { - msg.result = os.writev(msg.fd, msg.iov); + msg.result = noasync os.writev(msg.fd, msg.iov); }, .PWriteV => |*msg| { - msg.result = os.pwritev(msg.fd, msg.iov, msg.offset); + msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset); }, .PReadV => |*msg| { - msg.result = os.preadv(msg.fd, msg.iov, msg.offset); + msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset); }, .Open => |*msg| { - msg.result = os.openC(msg.path.ptr, msg.flags, msg.mode); + msg.result = noasync os.openC(msg.path.ptr, msg.flags, msg.mode); }, - .Close => |*msg| os.close(msg.fd), + .Close => |*msg| noasync os.close(msg.fd), .WriteFile => |*msg| blk: { const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC; - const fd = os.openC(msg.path.ptr, flags, msg.mode) catch |err| { + const fd = noasync os.openC(msg.path.ptr, flags, msg.mode) catch |err| { msg.result = err; break :blk; }; - defer os.close(fd); - msg.result = os.write(fd, msg.contents); + defer noasync os.close(fd); + msg.result = noasync os.write(fd, msg.contents); }, } switch (node.data.finish) { diff --git a/lib/std/event/net.zig b/lib/std/event/net.zig deleted file mode 100644 index bed665dcdc..0000000000 --- a/lib/std/event/net.zig +++ /dev/null @@ -1,358 +0,0 @@ -const std = @import("../std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; -const event = std.event; -const mem = std.mem; -const os = std.os; -const Loop = std.event.Loop; -const File = std.fs.File; -const fd_t = os.fd_t; - -pub const Server = struct { - handleRequestFn: async fn (*Server, *const std.net.Address, File) void, - - loop: *Loop, - sockfd: ?i32, - accept_frame: ?anyframe, - listen_address: std.net.Address, - - waiting_for_emfile_node: PromiseNode, - listen_resume_node: event.Loop.ResumeNode, - - const PromiseNode = std.TailQueue(anyframe).Node; - - pub fn init(loop: *Loop) Server { - // TODO can't initialize handler here because we need well defined copy elision - return Server{ - .loop = loop, - .sockfd = null, - .accept_frame = null, - .handleRequestFn = undefined, - .waiting_for_emfile_node = undefined, - .listen_address = undefined, - .listen_resume_node = event.Loop.ResumeNode{ - .id = event.Loop.ResumeNode.Id.Basic, - .handle = undefined, - .overlapped = event.Loop.ResumeNode.overlapped_init, - }, - }; - } - - pub fn listen( - self: *Server, - address: *const std.net.Address, - handleRequestFn: async fn (*Server, *const std.net.Address, File) void, - ) !void { - self.handleRequestFn = handleRequestFn; - - const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); - errdefer os.close(sockfd); - self.sockfd = sockfd; - - try os.bind(sockfd, &address.os_addr); - try os.listen(sockfd, os.SOMAXCONN); - self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd)); - - self.accept_frame = async Server.handler(self); - errdefer await self.accept_frame.?; - - self.listen_resume_node.handle = self.accept_frame.?; - try self.loop.linuxAddFd(sockfd, &self.listen_resume_node, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - errdefer self.loop.removeFd(sockfd); - } - - /// Stop listening - pub fn close(self: *Server) void { - self.loop.linuxRemoveFd(self.sockfd.?); - if (self.sockfd) |fd| { - os.close(fd); - self.sockfd = null; - } - } - - pub fn deinit(self: *Server) void { - if (self.accept_frame) |accept_frame| await accept_frame; - if (self.sockfd) |sockfd| os.close(sockfd); - } - - pub async fn handler(self: *Server) void { - while (true) { - var accepted_addr: std.net.Address = undefined; - // TODO just inline the following function here and don't expose it as posixAsyncAccept - if (os.accept4_async(self.sockfd.?, &accepted_addr.os_addr, os.SOCK_NONBLOCK | os.SOCK_CLOEXEC)) |accepted_fd| { - if (accepted_fd == -1) { - // would block - suspend; // we will get resumed by epoll_wait in the event loop - continue; - } - var socket = File.openHandle(accepted_fd); - self.handleRequestFn(self, &accepted_addr, socket); - } else |err| switch (err) { - error.ProcessFdQuotaExceeded => @panic("TODO handle this error"), - error.ConnectionAborted => continue, - - error.FileDescriptorNotASocket => unreachable, - error.OperationNotSupported => unreachable, - - error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => { - @panic("TODO handle this error"); - }, - } - } - } -}; - -pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 { - const sockfd = try os.socket( - os.AF_UNIX, - os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, - 0, - ); - errdefer os.close(sockfd); - - var sock_addr = os.sockaddr_un{ - .family = os.AF_UNIX, - .path = undefined, - }; - - if (path.len > @typeOf(sock_addr.path).len) return error.NameTooLong; - mem.copy(u8, sock_addr.path[0..], path); - const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len); - try os.connect_async(sockfd, &sock_addr, size); - try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - try os.getsockoptError(sockfd); - - return sockfd; -} - -pub const ReadError = error{ - SystemResources, - Unexpected, - UserResourceLimitReached, - InputOutput, - - FileDescriptorNotRegistered, // TODO remove this possibility - OperationCausesCircularLoop, // TODO remove this possibility - FileDescriptorAlreadyPresentInSet, // TODO remove this possibility - FileDescriptorIncompatibleWithEpoll, // TODO remove this possibility -}; - -/// returns number of bytes read. 0 means EOF. -pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize { - const iov = os.iovec{ - .iov_base = buffer.ptr, - .iov_len = buffer.len, - }; - const iovs: *const [1]os.iovec = &iov; - return readvPosix(loop, fd, iovs, 1); -} - -pub const WriteError = error{}; - -pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteError!void { - const iov = os.iovec_const{ - .iov_base = buffer.ptr, - .iov_len = buffer.len, - }; - const iovs: *const [1]os.iovec_const = &iov; - return writevPosix(loop, fd, iovs, 1); -} - -pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void { - while (true) { - switch (builtin.os) { - .macosx, .linux => { - switch (os.errno(os.system.writev(fd, iov, count))) { - 0 => return, - os.EINTR => continue, - os.ESPIPE => unreachable, - os.EINVAL => unreachable, - os.EFAULT => unreachable, - os.EAGAIN => { - try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT); - continue; - }, - os.EBADF => unreachable, // always a race condition - os.EDESTADDRREQ => unreachable, // connect was never called - os.EDQUOT => unreachable, - os.EFBIG => unreachable, - os.EIO => return error.InputOutput, - os.ENOSPC => unreachable, - os.EPERM => return error.AccessDenied, - os.EPIPE => unreachable, - else => |err| return os.unexpectedErrno(err), - } - }, - else => @compileError("Unsupported OS"), - } - } -} - -/// returns number of bytes read. 0 means EOF. -pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count: usize) !usize { - while (true) { - switch (builtin.os) { - builtin.Os.linux, builtin.Os.freebsd, builtin.Os.macosx => { - const rc = os.system.readv(fd, iov, count); - switch (os.errno(rc)) { - 0 => return rc, - os.EINTR => continue, - os.EINVAL => unreachable, - os.EFAULT => unreachable, - os.EAGAIN => { - try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); - continue; - }, - os.EBADF => unreachable, // always a race condition - os.EIO => return error.InputOutput, - os.EISDIR => unreachable, - os.ENOBUFS => return error.SystemResources, - os.ENOMEM => return error.SystemResources, - else => |err| return os.unexpectedErrno(err), - } - }, - else => @compileError("Unsupported OS"), - } - } -} - -pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void { - const iovecs = try loop.allocator.alloc(os.iovec_const, data.len); - defer loop.allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec_const{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return writevPosix(loop, fd, iovecs.ptr, data.len); -} - -pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize { - const iovecs = try loop.allocator.alloc(os.iovec, data.len); - defer loop.allocator.free(iovecs); - - for (data) |buf, i| { - iovecs[i] = os.iovec{ - .iov_base = buf.ptr, - .iov_len = buf.len, - }; - } - - return readvPosix(loop, fd, iovecs.ptr, data.len); -} - -pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File { - var address = _address.*; // TODO https://github.com/ziglang/zig/issues/1592 - - const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); - errdefer os.close(sockfd); - - try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in)); - try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); - try os.getsockoptError(sockfd); - - return File.openHandle(sockfd); -} - -test "listen on a port, send bytes, receive bytes" { - // https://github.com/ziglang/zig/issues/2377 - if (true) return error.SkipZigTest; - - if (builtin.os != builtin.Os.linux) { - // TODO build abstractions for other operating systems - return error.SkipZigTest; - } - - const MyServer = struct { - tcp_server: Server, - - const Self = @This(); - async fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void { - const self = @fieldParentPtr(Self, "tcp_server", tcp_server); - var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 - defer socket.close(); - const next_handler = errorableHandler(self, _addr, socket) catch |err| { - std.debug.panic("unable to handle connection: {}\n", err); - }; - } - async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void { - const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/1592 - var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 - - const stream = &socket.outStream().stream; - try stream.print("hello from server\n"); - } - }; - - const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable; - const addr = std.net.Address.initIp4(ip4addr, 0); - - var loop: Loop = undefined; - try loop.initSingleThreaded(std.debug.global_allocator); - var server = MyServer{ .tcp_server = Server.init(&loop) }; - defer server.tcp_server.deinit(); - try server.tcp_server.listen(&addr, MyServer.handler); - - _ = async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server); - loop.run(); -} - -async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void { - errdefer @panic("test failure"); - - var socket_file = try connect(loop, address); - defer socket_file.close(); - - var buf: [512]u8 = undefined; - const amt_read = try socket_file.read(buf[0..]); - const msg = buf[0..amt_read]; - testing.expect(mem.eql(u8, msg, "hello from server\n")); - server.close(); -} - -pub const OutStream = struct { - fd: fd_t, - stream: Stream, - loop: *Loop, - - pub const Error = WriteError; - pub const Stream = event.io.OutStream(Error); - - pub fn init(loop: *Loop, fd: fd_t) OutStream { - return OutStream{ - .fd = fd, - .loop = loop, - .stream = Stream{ .writeFn = writeFn }, - }; - } - - async fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { - const self = @fieldParentPtr(OutStream, "stream", out_stream); - return write(self.loop, self.fd, bytes); - } -}; - -pub const InStream = struct { - fd: fd_t, - stream: Stream, - loop: *Loop, - - pub const Error = ReadError; - pub const Stream = event.io.InStream(Error); - - pub fn init(loop: *Loop, fd: fd_t) InStream { - return InStream{ - .fd = fd, - .loop = loop, - .stream = Stream{ .readFn = readFn }, - }; - } - - async fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { - const self = @fieldParentPtr(InStream, "stream", in_stream); - return read(self.loop, self.fd, bytes); - } -}; |
