aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2019-10-30 21:30:16 -0400
committerAndrew Kelley <andrew@ziglang.org>2019-10-30 21:30:16 -0400
commit61d5a0bf48d034208aea37d72dac5b3531334be7 (patch)
tree57e545a972ae44c3bc6bab98396f9cb22203edd1 /lib/std/event
parent6a15e8a7a771bcbf2534cceecd77231344aafbf8 (diff)
parent7b7ba51642c832c77ec2668491843be3b0114124 (diff)
downloadzig-61d5a0bf48d034208aea37d72dac5b3531334be7.tar.gz
zig-61d5a0bf48d034208aea37d72dac5b3531334be7.zip
Merge branch 'std.net'
Diffstat (limited to 'lib/std/event')
-rw-r--r--lib/std/event/channel.zig10
-rw-r--r--lib/std/event/loop.zig87
-rw-r--r--lib/std/event/net.zig358
3 files changed, 73 insertions, 382 deletions
diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig
index f8539d5259..88edc90f16 100644
--- a/lib/std/event/channel.zig
+++ b/lib/std/event/channel.zig
@@ -4,9 +4,11 @@ const assert = std.debug.assert;
const testing = std.testing;
const Loop = std.event.Loop;
-/// many producer, many consumer, thread-safe, runtime configurable buffer size
-/// when buffer is empty, consumers suspend and are resumed by producers
-/// when buffer is full, producers suspend and are resumed by consumers
+/// Many producer, many consumer, thread-safe, runtime configurable buffer size.
+/// When buffer is empty, consumers suspend and are resumed by producers.
+/// When buffer is full, producers suspend and are resumed by consumers.
+/// TODO now that async function rewrite has landed, this API should be adjusted
+/// to not use the event loop's allocator, and to not require allocation.
pub fn Channel(comptime T: type) type {
return struct {
loop: *Loop,
@@ -48,7 +50,7 @@ pub fn Channel(comptime T: type) type {
tick_node: *Loop.NextTickNode,
};
- /// call destroy when done
+ /// Call `destroy` when done.
pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
const buffer_nodes = try loop.allocator.alloc(T, capacity);
errdefer loop.allocator.free(buffer_nodes);
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
index 22013edba6..ae8d76676d 100644
--- a/lib/std/event/loop.zig
+++ b/lib/std/event/loop.zig
@@ -448,22 +448,67 @@ pub const Loop = struct {
self.finishOneEvent();
}
- pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void {
- defer self.linuxRemoveFd(fd);
+ pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) void {
+ assert(flags & os.EPOLLET == os.EPOLLET);
+ assert(flags & os.EPOLLONESHOT == os.EPOLLONESHOT);
+ var resume_node = ResumeNode.Basic{
+ .base = ResumeNode{
+ .id = .Basic,
+ .handle = @frame(),
+ .overlapped = ResumeNode.overlapped_init,
+ },
+ };
+ var need_to_delete = false;
+ defer if (need_to_delete) self.linuxRemoveFd(fd);
+
suspend {
- var resume_node = ResumeNode.Basic{
- .base = ResumeNode{
- .id = .Basic,
- .handle = @frame(),
- .overlapped = ResumeNode.overlapped_init,
+ if (self.linuxAddFd(fd, &resume_node.base, flags)) |_| {
+ need_to_delete = true;
+ } else |err| switch (err) {
+ error.FileDescriptorNotRegistered => unreachable,
+ error.OperationCausesCircularLoop => unreachable,
+ error.FileDescriptorIncompatibleWithEpoll => unreachable,
+ error.FileDescriptorAlreadyPresentInSet => unreachable, // evented writes to the same fd is not thread-safe
+
+ error.SystemResources,
+ error.UserResourceLimitReached,
+ error.Unexpected,
+ => {
+ // Fall back to a blocking poll(). Ideally this codepath is never hit, since
+ // epoll should be just fine. But this is better than incorrect behavior.
+ var poll_flags: i16 = 0;
+ if ((flags & os.EPOLLIN) != 0) poll_flags |= os.POLLIN;
+ if ((flags & os.EPOLLOUT) != 0) poll_flags |= os.POLLOUT;
+ var pfd = [1]os.pollfd{os.pollfd{
+ .fd = fd,
+ .events = poll_flags,
+ .revents = undefined,
+ }};
+ _ = os.poll(&pfd, -1) catch |poll_err| switch (poll_err) {
+ error.SystemResources,
+ error.Unexpected,
+ => {
+ // Even poll() didn't work. The best we can do now is sleep for a
+ // small duration and then hope that something changed.
+ std.time.sleep(1 * std.time.millisecond);
+ },
+ };
+ resume @frame();
},
- };
- try self.linuxAddFd(fd, &resume_node.base, flags);
+ }
}
}
- pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) !void {
- return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN);
+ pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) void {
+ return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLIN);
+ }
+
+ pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) void {
+ return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT);
+ }
+
+ pub fn waitUntilFdWritableOrReadable(self: *Loop, fd: os.fd_t) void {
+ return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT | os.EPOLLIN);
}
pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent {
@@ -642,7 +687,7 @@ pub const Loop = struct {
.linux => {
self.posixFsRequest(&self.os_data.fs_end_request);
// writing 8 bytes to an eventfd cannot fail
- os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ noasync os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
return;
},
.macosx, .freebsd, .netbsd, .dragonfly => {
@@ -790,6 +835,8 @@ pub const Loop = struct {
}
}
+ // TODO make this whole function noasync
+ // https://github.com/ziglang/zig/issues/3157
fn posixFsRun(self: *Loop) void {
while (true) {
if (builtin.os == .linux) {
@@ -799,27 +846,27 @@ pub const Loop = struct {
switch (node.data.msg) {
.End => return,
.WriteV => |*msg| {
- msg.result = os.writev(msg.fd, msg.iov);
+ msg.result = noasync os.writev(msg.fd, msg.iov);
},
.PWriteV => |*msg| {
- msg.result = os.pwritev(msg.fd, msg.iov, msg.offset);
+ msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset);
},
.PReadV => |*msg| {
- msg.result = os.preadv(msg.fd, msg.iov, msg.offset);
+ msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset);
},
.Open => |*msg| {
- msg.result = os.openC(msg.path.ptr, msg.flags, msg.mode);
+ msg.result = noasync os.openC(msg.path.ptr, msg.flags, msg.mode);
},
- .Close => |*msg| os.close(msg.fd),
+ .Close => |*msg| noasync os.close(msg.fd),
.WriteFile => |*msg| blk: {
const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT |
os.O_CLOEXEC | os.O_TRUNC;
- const fd = os.openC(msg.path.ptr, flags, msg.mode) catch |err| {
+ const fd = noasync os.openC(msg.path.ptr, flags, msg.mode) catch |err| {
msg.result = err;
break :blk;
};
- defer os.close(fd);
- msg.result = os.write(fd, msg.contents);
+ defer noasync os.close(fd);
+ msg.result = noasync os.write(fd, msg.contents);
},
}
switch (node.data.finish) {
diff --git a/lib/std/event/net.zig b/lib/std/event/net.zig
deleted file mode 100644
index bed665dcdc..0000000000
--- a/lib/std/event/net.zig
+++ /dev/null
@@ -1,358 +0,0 @@
-const std = @import("../std.zig");
-const builtin = @import("builtin");
-const testing = std.testing;
-const event = std.event;
-const mem = std.mem;
-const os = std.os;
-const Loop = std.event.Loop;
-const File = std.fs.File;
-const fd_t = os.fd_t;
-
-pub const Server = struct {
- handleRequestFn: async fn (*Server, *const std.net.Address, File) void,
-
- loop: *Loop,
- sockfd: ?i32,
- accept_frame: ?anyframe,
- listen_address: std.net.Address,
-
- waiting_for_emfile_node: PromiseNode,
- listen_resume_node: event.Loop.ResumeNode,
-
- const PromiseNode = std.TailQueue(anyframe).Node;
-
- pub fn init(loop: *Loop) Server {
- // TODO can't initialize handler here because we need well defined copy elision
- return Server{
- .loop = loop,
- .sockfd = null,
- .accept_frame = null,
- .handleRequestFn = undefined,
- .waiting_for_emfile_node = undefined,
- .listen_address = undefined,
- .listen_resume_node = event.Loop.ResumeNode{
- .id = event.Loop.ResumeNode.Id.Basic,
- .handle = undefined,
- .overlapped = event.Loop.ResumeNode.overlapped_init,
- },
- };
- }
-
- pub fn listen(
- self: *Server,
- address: *const std.net.Address,
- handleRequestFn: async fn (*Server, *const std.net.Address, File) void,
- ) !void {
- self.handleRequestFn = handleRequestFn;
-
- const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp);
- errdefer os.close(sockfd);
- self.sockfd = sockfd;
-
- try os.bind(sockfd, &address.os_addr);
- try os.listen(sockfd, os.SOMAXCONN);
- self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd));
-
- self.accept_frame = async Server.handler(self);
- errdefer await self.accept_frame.?;
-
- self.listen_resume_node.handle = self.accept_frame.?;
- try self.loop.linuxAddFd(sockfd, &self.listen_resume_node, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
- errdefer self.loop.removeFd(sockfd);
- }
-
- /// Stop listening
- pub fn close(self: *Server) void {
- self.loop.linuxRemoveFd(self.sockfd.?);
- if (self.sockfd) |fd| {
- os.close(fd);
- self.sockfd = null;
- }
- }
-
- pub fn deinit(self: *Server) void {
- if (self.accept_frame) |accept_frame| await accept_frame;
- if (self.sockfd) |sockfd| os.close(sockfd);
- }
-
- pub async fn handler(self: *Server) void {
- while (true) {
- var accepted_addr: std.net.Address = undefined;
- // TODO just inline the following function here and don't expose it as posixAsyncAccept
- if (os.accept4_async(self.sockfd.?, &accepted_addr.os_addr, os.SOCK_NONBLOCK | os.SOCK_CLOEXEC)) |accepted_fd| {
- if (accepted_fd == -1) {
- // would block
- suspend; // we will get resumed by epoll_wait in the event loop
- continue;
- }
- var socket = File.openHandle(accepted_fd);
- self.handleRequestFn(self, &accepted_addr, socket);
- } else |err| switch (err) {
- error.ProcessFdQuotaExceeded => @panic("TODO handle this error"),
- error.ConnectionAborted => continue,
-
- error.FileDescriptorNotASocket => unreachable,
- error.OperationNotSupported => unreachable,
-
- error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => {
- @panic("TODO handle this error");
- },
- }
- }
- }
-};
-
-pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 {
- const sockfd = try os.socket(
- os.AF_UNIX,
- os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK,
- 0,
- );
- errdefer os.close(sockfd);
-
- var sock_addr = os.sockaddr_un{
- .family = os.AF_UNIX,
- .path = undefined,
- };
-
- if (path.len > @typeOf(sock_addr.path).len) return error.NameTooLong;
- mem.copy(u8, sock_addr.path[0..], path);
- const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len);
- try os.connect_async(sockfd, &sock_addr, size);
- try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
- try os.getsockoptError(sockfd);
-
- return sockfd;
-}
-
-pub const ReadError = error{
- SystemResources,
- Unexpected,
- UserResourceLimitReached,
- InputOutput,
-
- FileDescriptorNotRegistered, // TODO remove this possibility
- OperationCausesCircularLoop, // TODO remove this possibility
- FileDescriptorAlreadyPresentInSet, // TODO remove this possibility
- FileDescriptorIncompatibleWithEpoll, // TODO remove this possibility
-};
-
-/// returns number of bytes read. 0 means EOF.
-pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize {
- const iov = os.iovec{
- .iov_base = buffer.ptr,
- .iov_len = buffer.len,
- };
- const iovs: *const [1]os.iovec = &iov;
- return readvPosix(loop, fd, iovs, 1);
-}
-
-pub const WriteError = error{};
-
-pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteError!void {
- const iov = os.iovec_const{
- .iov_base = buffer.ptr,
- .iov_len = buffer.len,
- };
- const iovs: *const [1]os.iovec_const = &iov;
- return writevPosix(loop, fd, iovs, 1);
-}
-
-pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void {
- while (true) {
- switch (builtin.os) {
- .macosx, .linux => {
- switch (os.errno(os.system.writev(fd, iov, count))) {
- 0 => return,
- os.EINTR => continue,
- os.ESPIPE => unreachable,
- os.EINVAL => unreachable,
- os.EFAULT => unreachable,
- os.EAGAIN => {
- try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT);
- continue;
- },
- os.EBADF => unreachable, // always a race condition
- os.EDESTADDRREQ => unreachable, // connect was never called
- os.EDQUOT => unreachable,
- os.EFBIG => unreachable,
- os.EIO => return error.InputOutput,
- os.ENOSPC => unreachable,
- os.EPERM => return error.AccessDenied,
- os.EPIPE => unreachable,
- else => |err| return os.unexpectedErrno(err),
- }
- },
- else => @compileError("Unsupported OS"),
- }
- }
-}
-
-/// returns number of bytes read. 0 means EOF.
-pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count: usize) !usize {
- while (true) {
- switch (builtin.os) {
- builtin.Os.linux, builtin.Os.freebsd, builtin.Os.macosx => {
- const rc = os.system.readv(fd, iov, count);
- switch (os.errno(rc)) {
- 0 => return rc,
- os.EINTR => continue,
- os.EINVAL => unreachable,
- os.EFAULT => unreachable,
- os.EAGAIN => {
- try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN);
- continue;
- },
- os.EBADF => unreachable, // always a race condition
- os.EIO => return error.InputOutput,
- os.EISDIR => unreachable,
- os.ENOBUFS => return error.SystemResources,
- os.ENOMEM => return error.SystemResources,
- else => |err| return os.unexpectedErrno(err),
- }
- },
- else => @compileError("Unsupported OS"),
- }
- }
-}
-
-pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void {
- const iovecs = try loop.allocator.alloc(os.iovec_const, data.len);
- defer loop.allocator.free(iovecs);
-
- for (data) |buf, i| {
- iovecs[i] = os.iovec_const{
- .iov_base = buf.ptr,
- .iov_len = buf.len,
- };
- }
-
- return writevPosix(loop, fd, iovecs.ptr, data.len);
-}
-
-pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize {
- const iovecs = try loop.allocator.alloc(os.iovec, data.len);
- defer loop.allocator.free(iovecs);
-
- for (data) |buf, i| {
- iovecs[i] = os.iovec{
- .iov_base = buf.ptr,
- .iov_len = buf.len,
- };
- }
-
- return readvPosix(loop, fd, iovecs.ptr, data.len);
-}
-
-pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File {
- var address = _address.*; // TODO https://github.com/ziglang/zig/issues/1592
-
- const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp);
- errdefer os.close(sockfd);
-
- try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in));
- try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
- try os.getsockoptError(sockfd);
-
- return File.openHandle(sockfd);
-}
-
-test "listen on a port, send bytes, receive bytes" {
- // https://github.com/ziglang/zig/issues/2377
- if (true) return error.SkipZigTest;
-
- if (builtin.os != builtin.Os.linux) {
- // TODO build abstractions for other operating systems
- return error.SkipZigTest;
- }
-
- const MyServer = struct {
- tcp_server: Server,
-
- const Self = @This();
- async fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void {
- const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
- var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592
- defer socket.close();
- const next_handler = errorableHandler(self, _addr, socket) catch |err| {
- std.debug.panic("unable to handle connection: {}\n", err);
- };
- }
- async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void {
- const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/1592
- var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592
-
- const stream = &socket.outStream().stream;
- try stream.print("hello from server\n");
- }
- };
-
- const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
- const addr = std.net.Address.initIp4(ip4addr, 0);
-
- var loop: Loop = undefined;
- try loop.initSingleThreaded(std.debug.global_allocator);
- var server = MyServer{ .tcp_server = Server.init(&loop) };
- defer server.tcp_server.deinit();
- try server.tcp_server.listen(&addr, MyServer.handler);
-
- _ = async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server);
- loop.run();
-}
-
-async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void {
- errdefer @panic("test failure");
-
- var socket_file = try connect(loop, address);
- defer socket_file.close();
-
- var buf: [512]u8 = undefined;
- const amt_read = try socket_file.read(buf[0..]);
- const msg = buf[0..amt_read];
- testing.expect(mem.eql(u8, msg, "hello from server\n"));
- server.close();
-}
-
-pub const OutStream = struct {
- fd: fd_t,
- stream: Stream,
- loop: *Loop,
-
- pub const Error = WriteError;
- pub const Stream = event.io.OutStream(Error);
-
- pub fn init(loop: *Loop, fd: fd_t) OutStream {
- return OutStream{
- .fd = fd,
- .loop = loop,
- .stream = Stream{ .writeFn = writeFn },
- };
- }
-
- async fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void {
- const self = @fieldParentPtr(OutStream, "stream", out_stream);
- return write(self.loop, self.fd, bytes);
- }
-};
-
-pub const InStream = struct {
- fd: fd_t,
- stream: Stream,
- loop: *Loop,
-
- pub const Error = ReadError;
- pub const Stream = event.io.InStream(Error);
-
- pub fn init(loop: *Loop, fd: fd_t) InStream {
- return InStream{
- .fd = fd,
- .loop = loop,
- .stream = Stream{ .readFn = readFn },
- };
- }
-
- async fn readFn(in_stream: *Stream, bytes: []u8) Error!usize {
- const self = @fieldParentPtr(InStream, "stream", in_stream);
- return read(self.loop, self.fd, bytes);
- }
-};