aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event/loop.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2024-02-09 13:38:42 -0800
committerGitHub <noreply@github.com>2024-02-09 13:38:42 -0800
commit54bbc73f8502fe073d385361ddb34a43d12eec39 (patch)
tree6eb554a4639f2c05bdfa5fc94553edc7f5df1650 /lib/std/event/loop.zig
parentd3cf911a803912a5aabe7a8d130c8b6468b95ff1 (diff)
parent318e9cdaaae2f01c1a6e5db9cf66fe875821787e (diff)
downloadzig-54bbc73f8502fe073d385361ddb34a43d12eec39.tar.gz
zig-54bbc73f8502fe073d385361ddb34a43d12eec39.zip
Merge pull request #18712 from Vexu/std.options
std: make options a struct instance instead of a namespace
Diffstat (limited to 'lib/std/event/loop.zig')
-rw-r--r--lib/std/event/loop.zig1791
1 files changed, 0 insertions, 1791 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
deleted file mode 100644
index d7b75a6672..0000000000
--- a/lib/std/event/loop.zig
+++ /dev/null
@@ -1,1791 +0,0 @@
-const std = @import("../std.zig");
-const builtin = @import("builtin");
-const assert = std.debug.assert;
-const testing = std.testing;
-const mem = std.mem;
-const os = std.os;
-const windows = os.windows;
-const maxInt = std.math.maxInt;
-const Thread = std.Thread;
-
-const is_windows = builtin.os.tag == .windows;
-
-pub const Loop = struct {
- next_tick_queue: std.atomic.Queue(anyframe),
- os_data: OsData,
- final_resume_node: ResumeNode,
- pending_event_count: usize,
- extra_threads: []Thread,
- /// TODO change this to a pool of configurable number of threads
- /// and rename it to be not file-system-specific. it will become
- /// a thread pool for turning non-CPU-bound blocking things into
- /// async things. A fallback for any missing OS-specific API.
- fs_thread: Thread,
- fs_queue: std.atomic.Queue(Request),
- fs_end_request: Request.Node,
- fs_thread_wakeup: std.Thread.ResetEvent,
-
- /// For resources that have the same lifetime as the `Loop`.
- /// This is only used by `Loop` for the thread pool and associated resources.
- arena: std.heap.ArenaAllocator,
-
- /// State which manages frames that are sleeping on timers
- delay_queue: DelayQueue,
-
- /// Pre-allocated eventfds. All permanently active.
- /// This is how `Loop` sends promises to be resumed on other threads.
- available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
- eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
-
- pub const NextTickNode = std.atomic.Queue(anyframe).Node;
-
- pub const ResumeNode = struct {
- id: Id,
- handle: anyframe,
- overlapped: Overlapped,
-
- pub const overlapped_init = switch (builtin.os.tag) {
- .windows => windows.OVERLAPPED{
- .Internal = 0,
- .InternalHigh = 0,
- .DUMMYUNIONNAME = .{
- .DUMMYSTRUCTNAME = .{
- .Offset = 0,
- .OffsetHigh = 0,
- },
- },
- .hEvent = null,
- },
- else => {},
- };
- pub const Overlapped = @TypeOf(overlapped_init);
-
- pub const Id = enum {
- basic,
- stop,
- event_fd,
- };
-
- pub const EventFd = switch (builtin.os.tag) {
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventFd,
- .linux => struct {
- base: ResumeNode,
- epoll_op: u32,
- eventfd: i32,
- },
- .windows => struct {
- base: ResumeNode,
- completion_key: usize,
- },
- else => struct {},
- };
-
- const KEventFd = struct {
- base: ResumeNode,
- kevent: os.Kevent,
- };
-
- pub const Basic = switch (builtin.os.tag) {
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventBasic,
- .linux => struct {
- base: ResumeNode,
- },
- .windows => struct {
- base: ResumeNode,
- },
- else => @compileError("unsupported OS"),
- };
-
- const KEventBasic = struct {
- base: ResumeNode,
- kev: os.Kevent,
- };
- };
-
- pub const Instance = switch (std.options.io_mode) {
- .blocking => @TypeOf(null),
- .evented => ?*Loop,
- };
- pub const instance = std.options.event_loop;
-
- var global_instance_state: Loop = undefined;
- pub const default_instance = switch (std.options.io_mode) {
- .blocking => null,
- .evented => &global_instance_state,
- };
-
- pub const Mode = enum {
- single_threaded,
- multi_threaded,
- };
- pub const default_mode = .multi_threaded;
-
- /// TODO copy elision / named return values so that the threads referencing *Loop
- /// have the correct pointer value.
- /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
- pub fn init(self: *Loop) !void {
- if (builtin.single_threaded or std.options.event_loop_mode == .single_threaded) {
- return self.initSingleThreaded();
- } else {
- return self.initMultiThreaded();
- }
- }
-
- /// After initialization, call run().
- /// TODO copy elision / named return values so that the threads referencing *Loop
- /// have the correct pointer value.
- /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
- pub fn initSingleThreaded(self: *Loop) !void {
- return self.initThreadPool(1);
- }
-
- /// After initialization, call run().
- /// This is the same as `initThreadPool` using `Thread.getCpuCount` to determine the thread
- /// pool size.
- /// TODO copy elision / named return values so that the threads referencing *Loop
- /// have the correct pointer value.
- /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
- pub fn initMultiThreaded(self: *Loop) !void {
- if (builtin.single_threaded)
- @compileError("initMultiThreaded unavailable when building in single-threaded mode");
- const core_count = try Thread.getCpuCount();
- return self.initThreadPool(core_count);
- }
-
- /// Thread count is the total thread count. The thread pool size will be
- /// max(thread_count - 1, 0)
- pub fn initThreadPool(self: *Loop, thread_count: usize) !void {
- self.* = Loop{
- .arena = std.heap.ArenaAllocator.init(std.heap.page_allocator),
- .pending_event_count = 1,
- .os_data = undefined,
- .next_tick_queue = std.atomic.Queue(anyframe).init(),
- .extra_threads = undefined,
- .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
- .eventfd_resume_nodes = undefined,
- .final_resume_node = ResumeNode{
- .id = .stop,
- .handle = undefined,
- .overlapped = ResumeNode.overlapped_init,
- },
- .fs_end_request = .{ .data = .{ .msg = .end, .finish = .no_action } },
- .fs_queue = std.atomic.Queue(Request).init(),
- .fs_thread = undefined,
- .fs_thread_wakeup = .{},
- .delay_queue = undefined,
- };
- errdefer self.arena.deinit();
-
- // We need at least one of these in case the fs thread wants to use onNextTick
- const extra_thread_count = thread_count - 1;
- const resume_node_count = @max(extra_thread_count, 1);
- self.eventfd_resume_nodes = try self.arena.allocator().alloc(
- std.atomic.Stack(ResumeNode.EventFd).Node,
- resume_node_count,
- );
-
- self.extra_threads = try self.arena.allocator().alloc(Thread, extra_thread_count);
-
- try self.initOsData(extra_thread_count);
- errdefer self.deinitOsData();
-
- if (!builtin.single_threaded) {
- self.fs_thread = try Thread.spawn(.{}, posixFsRun, .{self});
- }
- errdefer if (!builtin.single_threaded) {
- self.posixFsRequest(&self.fs_end_request);
- self.fs_thread.join();
- };
-
- if (!builtin.single_threaded)
- try self.delay_queue.init();
- }
-
- pub fn deinit(self: *Loop) void {
- self.deinitOsData();
- self.arena.deinit();
- self.* = undefined;
- }
-
- const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError ||
- Thread.SpawnError || os.EpollCtlError || os.KEventError ||
- windows.CreateIoCompletionPortError;
-
- const wakeup_bytes = [_]u8{0x1} ** 8;
-
- fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
- nosuspend switch (builtin.os.tag) {
- .linux => {
- errdefer {
- while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
- }
- for (self.eventfd_resume_nodes) |*eventfd_node| {
- eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
- .data = ResumeNode.EventFd{
- .base = ResumeNode{
- .id = .event_fd,
- .handle = undefined,
- .overlapped = ResumeNode.overlapped_init,
- },
- .eventfd = try os.eventfd(1, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK),
- .epoll_op = os.linux.EPOLL.CTL_ADD,
- },
- .next = undefined,
- };
- self.available_eventfd_resume_nodes.push(eventfd_node);
- }
-
- self.os_data.epollfd = try os.epoll_create1(os.linux.EPOLL.CLOEXEC);
- errdefer os.close(self.os_data.epollfd);
-
- self.os_data.final_eventfd = try os.eventfd(0, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK);
- errdefer os.close(self.os_data.final_eventfd);
-
- self.os_data.final_eventfd_event = os.linux.epoll_event{
- .events = os.linux.EPOLL.IN,
- .data = os.linux.epoll_data{ .ptr = @intFromPtr(&self.final_resume_node) },
- };
- try os.epoll_ctl(
- self.os_data.epollfd,
- os.linux.EPOLL.CTL_ADD,
- self.os_data.final_eventfd,
- &self.os_data.final_eventfd_event,
- );
-
- if (builtin.single_threaded) {
- assert(extra_thread_count == 0);
- return;
- }
-
- var extra_thread_index: usize = 0;
- errdefer {
- // writing 8 bytes to an eventfd cannot fail
- const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
- assert(amt == wakeup_bytes.len);
- while (extra_thread_index != 0) {
- extra_thread_index -= 1;
- self.extra_threads[extra_thread_index].join();
- }
- }
- while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
- self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
- }
- },
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly => {
- self.os_data.kqfd = try os.kqueue();
- errdefer os.close(self.os_data.kqfd);
-
- const empty_kevs = &[0]os.Kevent{};
-
- for (self.eventfd_resume_nodes, 0..) |*eventfd_node, i| {
- eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
- .data = ResumeNode.EventFd{
- .base = ResumeNode{
- .id = .event_fd,
- .handle = undefined,
- .overlapped = ResumeNode.overlapped_init,
- },
- // this one is for sending events
- .kevent = os.Kevent{
- .ident = i,
- .filter = os.system.EVFILT_USER,
- .flags = os.system.EV_CLEAR | os.system.EV_ADD | os.system.EV_DISABLE,
- .fflags = 0,
- .data = 0,
- .udata = @intFromPtr(&eventfd_node.data.base),
- },
- },
- .next = undefined,
- };
- self.available_eventfd_resume_nodes.push(eventfd_node);
- const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent);
- _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
- eventfd_node.data.kevent.flags = os.system.EV_CLEAR | os.system.EV_ENABLE;
- eventfd_node.data.kevent.fflags = os.system.NOTE_TRIGGER;
- }
-
- // Pre-add so that we cannot get error.SystemResources
- // later when we try to activate it.
- self.os_data.final_kevent = os.Kevent{
- .ident = extra_thread_count,
- .filter = os.system.EVFILT_USER,
- .flags = os.system.EV_ADD | os.system.EV_DISABLE,
- .fflags = 0,
- .data = 0,
- .udata = @intFromPtr(&self.final_resume_node),
- };
- const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
- _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
- self.os_data.final_kevent.flags = os.system.EV_ENABLE;
- self.os_data.final_kevent.fflags = os.system.NOTE_TRIGGER;
-
- if (builtin.single_threaded) {
- assert(extra_thread_count == 0);
- return;
- }
-
- var extra_thread_index: usize = 0;
- errdefer {
- _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
- while (extra_thread_index != 0) {
- extra_thread_index -= 1;
- self.extra_threads[extra_thread_index].join();
- }
- }
- while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
- self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
- }
- },
- .openbsd => {
- self.os_data.kqfd = try os.kqueue();
- errdefer os.close(self.os_data.kqfd);
-
- const empty_kevs = &[0]os.Kevent{};
-
- for (self.eventfd_resume_nodes, 0..) |*eventfd_node, i| {
- eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
- .data = ResumeNode.EventFd{
- .base = ResumeNode{
- .id = .event_fd,
- .handle = undefined,
- .overlapped = ResumeNode.overlapped_init,
- },
- // this one is for sending events
- .kevent = os.Kevent{
- .ident = i,
- .filter = os.system.EVFILT_TIMER,
- .flags = os.system.EV_CLEAR | os.system.EV_ADD | os.system.EV_DISABLE | os.system.EV_ONESHOT,
- .fflags = 0,
- .data = 0,
- .udata = @intFromPtr(&eventfd_node.data.base),
- },
- },
- .next = undefined,
- };
- self.available_eventfd_resume_nodes.push(eventfd_node);
- const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent);
- _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
- eventfd_node.data.kevent.flags = os.system.EV_CLEAR | os.system.EV_ENABLE;
- }
-
- // Pre-add so that we cannot get error.SystemResources
- // later when we try to activate it.
- self.os_data.final_kevent = os.Kevent{
- .ident = extra_thread_count,
- .filter = os.system.EVFILT_TIMER,
- .flags = os.system.EV_ADD | os.system.EV_ONESHOT | os.system.EV_DISABLE,
- .fflags = 0,
- .data = 0,
- .udata = @intFromPtr(&self.final_resume_node),
- };
- const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
- _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
- self.os_data.final_kevent.flags = os.system.EV_ENABLE;
-
- if (builtin.single_threaded) {
- assert(extra_thread_count == 0);
- return;
- }
-
- var extra_thread_index: usize = 0;
- errdefer {
- _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
- while (extra_thread_index != 0) {
- extra_thread_index -= 1;
- self.extra_threads[extra_thread_index].join();
- }
- }
- while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
- self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
- }
- },
- .windows => {
- self.os_data.io_port = try windows.CreateIoCompletionPort(
- windows.INVALID_HANDLE_VALUE,
- null,
- undefined,
- maxInt(windows.DWORD),
- );
- errdefer windows.CloseHandle(self.os_data.io_port);
-
- for (self.eventfd_resume_nodes) |*eventfd_node| {
- eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
- .data = ResumeNode.EventFd{
- .base = ResumeNode{
- .id = .event_fd,
- .handle = undefined,
- .overlapped = ResumeNode.overlapped_init,
- },
- // this one is for sending events
- .completion_key = @intFromPtr(&eventfd_node.data.base),
- },
- .next = undefined,
- };
- self.available_eventfd_resume_nodes.push(eventfd_node);
- }
-
- if (builtin.single_threaded) {
- assert(extra_thread_count == 0);
- return;
- }
-
- var extra_thread_index: usize = 0;
- errdefer {
- var i: usize = 0;
- while (i < extra_thread_index) : (i += 1) {
- while (true) {
- const overlapped = &self.final_resume_node.overlapped;
- windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
- break;
- }
- }
- while (extra_thread_index != 0) {
- extra_thread_index -= 1;
- self.extra_threads[extra_thread_index].join();
- }
- }
- while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
- self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
- }
- },
- else => {},
- };
- }
-
- fn deinitOsData(self: *Loop) void {
- nosuspend switch (builtin.os.tag) {
- .linux => {
- os.close(self.os_data.final_eventfd);
- while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
- os.close(self.os_data.epollfd);
- },
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- os.close(self.os_data.kqfd);
- },
- .windows => {
- windows.CloseHandle(self.os_data.io_port);
- },
- else => {},
- };
- }
-
- /// resume_node must live longer than the anyframe that it holds a reference to.
- /// flags must contain EPOLLET
- pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void {
- assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
- self.beginOneEvent();
- errdefer self.finishOneEvent();
- try self.linuxModFd(
- fd,
- os.linux.EPOLL.CTL_ADD,
- flags,
- resume_node,
- );
- }
-
- pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void {
- assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
- var ev = os.linux.epoll_event{
- .events = flags,
- .data = os.linux.epoll_data{ .ptr = @intFromPtr(resume_node) },
- };
- try os.epoll_ctl(self.os_data.epollfd, op, fd, &ev);
- }
-
- pub fn linuxRemoveFd(self: *Loop, fd: i32) void {
- os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL.CTL_DEL, fd, null) catch {};
- self.finishOneEvent();
- }
-
- pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) void {
- assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
- assert(flags & os.linux.EPOLL.ONESHOT == os.linux.EPOLL.ONESHOT);
- var resume_node = ResumeNode.Basic{
- .base = ResumeNode{
- .id = .basic,
- .handle = @frame(),
- .overlapped = ResumeNode.overlapped_init,
- },
- };
- var need_to_delete = true;
- defer if (need_to_delete) self.linuxRemoveFd(fd);
-
- suspend {
- self.linuxAddFd(fd, &resume_node.base, flags) catch |err| switch (err) {
- error.FileDescriptorNotRegistered => unreachable,
- error.OperationCausesCircularLoop => unreachable,
- error.FileDescriptorIncompatibleWithEpoll => unreachable,
- error.FileDescriptorAlreadyPresentInSet => unreachable, // evented writes to the same fd is not thread-safe
-
- error.SystemResources,
- error.UserResourceLimitReached,
- error.Unexpected,
- => {
- need_to_delete = false;
- // Fall back to a blocking poll(). Ideally this codepath is never hit, since
- // epoll should be just fine. But this is better than incorrect behavior.
- var poll_flags: i16 = 0;
- if ((flags & os.linux.EPOLL.IN) != 0) poll_flags |= os.POLL.IN;
- if ((flags & os.linux.EPOLL.OUT) != 0) poll_flags |= os.POLL.OUT;
- var pfd = [1]os.pollfd{os.pollfd{
- .fd = fd,
- .events = poll_flags,
- .revents = undefined,
- }};
- _ = os.poll(&pfd, -1) catch |poll_err| switch (poll_err) {
- error.NetworkSubsystemFailed => unreachable, // only possible on windows
-
- error.SystemResources,
- error.Unexpected,
- => {
- // Even poll() didn't work. The best we can do now is sleep for a
- // small duration and then hope that something changed.
- std.time.sleep(1 * std.time.ns_per_ms);
- },
- };
- resume @frame();
- },
- };
- }
- }
-
- pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) void {
- switch (builtin.os.tag) {
- .linux => {
- self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.IN);
- },
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- self.bsdWaitKev(@as(usize, @intCast(fd)), os.system.EVFILT_READ, os.system.EV_ONESHOT);
- },
- else => @compileError("Unsupported OS"),
- }
- }
-
- pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) void {
- switch (builtin.os.tag) {
- .linux => {
- self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.OUT);
- },
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- self.bsdWaitKev(@as(usize, @intCast(fd)), os.system.EVFILT_WRITE, os.system.EV_ONESHOT);
- },
- else => @compileError("Unsupported OS"),
- }
- }
-
- pub fn waitUntilFdWritableOrReadable(self: *Loop, fd: os.fd_t) void {
- switch (builtin.os.tag) {
- .linux => {
- self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.OUT | os.linux.EPOLL.IN);
- },
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- self.bsdWaitKev(@as(usize, @intCast(fd)), os.system.EVFILT_READ, os.system.EV_ONESHOT);
- self.bsdWaitKev(@as(usize, @intCast(fd)), os.system.EVFILT_WRITE, os.system.EV_ONESHOT);
- },
- else => @compileError("Unsupported OS"),
- }
- }
-
- pub fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, flags: u16) void {
- var resume_node = ResumeNode.Basic{
- .base = ResumeNode{
- .id = .basic,
- .handle = @frame(),
- .overlapped = ResumeNode.overlapped_init,
- },
- .kev = undefined,
- };
-
- defer {
- // If the kevent was set to be ONESHOT, it doesn't need to be deleted manually.
- if (flags & os.system.EV_ONESHOT != 0) {
- self.bsdRemoveKev(ident, filter);
- }
- }
-
- suspend {
- self.bsdAddKev(&resume_node, ident, filter, flags) catch unreachable;
- }
- }
-
- /// resume_node must live longer than the anyframe that it holds a reference to.
- pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode.Basic, ident: usize, filter: i16, flags: u16) !void {
- self.beginOneEvent();
- errdefer self.finishOneEvent();
- var kev = [1]os.Kevent{os.Kevent{
- .ident = ident,
- .filter = filter,
- .flags = os.system.EV_ADD | os.system.EV_ENABLE | os.system.EV_CLEAR | flags,
- .fflags = 0,
- .data = 0,
- .udata = @intFromPtr(&resume_node.base),
- }};
- const empty_kevs = &[0]os.Kevent{};
- _ = try os.kevent(self.os_data.kqfd, &kev, empty_kevs, null);
- }
-
- pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void {
- var kev = [1]os.Kevent{os.Kevent{
- .ident = ident,
- .filter = filter,
- .flags = os.system.EV_DELETE,
- .fflags = 0,
- .data = 0,
- .udata = 0,
- }};
- const empty_kevs = &[0]os.Kevent{};
- _ = os.kevent(self.os_data.kqfd, &kev, empty_kevs, null) catch undefined;
- self.finishOneEvent();
- }
-
- fn dispatch(self: *Loop) void {
- while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
- const next_tick_node = self.next_tick_queue.get() orelse {
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- return;
- };
- const eventfd_node = &resume_stack_node.data;
- eventfd_node.base.handle = next_tick_node.data;
- switch (builtin.os.tag) {
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.kevent);
- const empty_kevs = &[0]os.Kevent{};
- _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch {
- self.next_tick_queue.unget(next_tick_node);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- return;
- };
- },
- .linux => {
- // the pending count is already accounted for
- const epoll_events = os.linux.EPOLL.ONESHOT | os.linux.EPOLL.IN | os.linux.EPOLL.OUT |
- os.linux.EPOLL.ET;
- self.linuxModFd(
- eventfd_node.eventfd,
- eventfd_node.epoll_op,
- epoll_events,
- &eventfd_node.base,
- ) catch {
- self.next_tick_queue.unget(next_tick_node);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- return;
- };
- },
- .windows => {
- windows.PostQueuedCompletionStatus(
- self.os_data.io_port,
- undefined,
- undefined,
- &eventfd_node.base.overlapped,
- ) catch {
- self.next_tick_queue.unget(next_tick_node);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- return;
- };
- },
- else => @compileError("unsupported OS"),
- }
- }
- }
-
- /// Bring your own linked list node. This means it can't fail.
- pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
- self.beginOneEvent(); // finished in dispatch()
- self.next_tick_queue.put(node);
- self.dispatch();
- }
-
- pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void {
- if (self.next_tick_queue.remove(node)) {
- self.finishOneEvent();
- }
- }
-
- pub fn run(self: *Loop) void {
- self.finishOneEvent(); // the reference we start with
-
- self.workerRun();
-
- if (!builtin.single_threaded) {
- switch (builtin.os.tag) {
- .linux,
- .macos,
- .ios,
- .tvos,
- .watchos,
- .freebsd,
- .netbsd,
- .dragonfly,
- .openbsd,
- => self.fs_thread.join(),
- else => {},
- }
- }
-
- for (self.extra_threads) |extra_thread| {
- extra_thread.join();
- }
-
- self.delay_queue.deinit();
- }
-
- /// Runs the provided function asynchronously. The function's frame is allocated
- /// with `allocator` and freed when the function returns.
- /// `func` must return void and it can be an async function.
- /// Yields to the event loop, running the function on the next tick.
- pub fn runDetached(self: *Loop, alloc: mem.Allocator, comptime func: anytype, args: anytype) error{OutOfMemory}!void {
- if (!std.io.is_async) @compileError("Can't use runDetached in non-async mode!");
- if (@TypeOf(@call(.{}, func, args)) != void) {
- @compileError("`func` must not have a return value");
- }
-
- const Wrapper = struct {
- const Args = @TypeOf(args);
- fn run(func_args: Args, loop: *Loop, allocator: mem.Allocator) void {
- loop.beginOneEvent();
- loop.yield();
- @call(.{}, func, func_args); // compile error when called with non-void ret type
- suspend {
- loop.finishOneEvent();
- allocator.destroy(@frame());
- }
- }
- };
-
- const run_frame = try alloc.create(@Frame(Wrapper.run));
- run_frame.* = async Wrapper.run(args, self, alloc);
- }
-
- /// Yielding lets the event loop run, starting any unstarted async operations.
- /// Note that async operations automatically start when a function yields for any other reason,
- /// for example, when async I/O is performed. This function is intended to be used only when
- /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O
- /// is performed.
- pub fn yield(self: *Loop) void {
- suspend {
- var my_tick_node = NextTickNode{
- .prev = undefined,
- .next = undefined,
- .data = @frame(),
- };
- self.onNextTick(&my_tick_node);
- }
- }
-
- /// If the build is multi-threaded and there is an event loop, then it calls `yield`. Otherwise,
- /// does nothing.
- pub fn startCpuBoundOperation() void {
- if (builtin.single_threaded) {
- return;
- } else if (instance) |event_loop| {
- event_loop.yield();
- }
- }
-
- /// call finishOneEvent when done
- pub fn beginOneEvent(self: *Loop) void {
- _ = @atomicRmw(usize, &self.pending_event_count, .Add, 1, .SeqCst);
- }
-
- pub fn finishOneEvent(self: *Loop) void {
- nosuspend {
- const prev = @atomicRmw(usize, &self.pending_event_count, .Sub, 1, .SeqCst);
- if (prev != 1) return;
-
- // cause all the threads to stop
- self.posixFsRequest(&self.fs_end_request);
-
- switch (builtin.os.tag) {
- .linux => {
- // writing to the eventfd will only wake up one thread, thus multiple writes
- // are needed to wakeup all the threads
- var i: usize = 0;
- while (i < self.extra_threads.len + 1) : (i += 1) {
- // writing 8 bytes to an eventfd cannot fail
- const amt = os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
- assert(amt == wakeup_bytes.len);
- }
- return;
- },
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- const final_kevent = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
- const empty_kevs = &[0]os.Kevent{};
- // cannot fail because we already added it and this just enables it
- _ = os.kevent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable;
- return;
- },
- .windows => {
- var i: usize = 0;
- while (i < self.extra_threads.len + 1) : (i += 1) {
- while (true) {
- const overlapped = &self.final_resume_node.overlapped;
- windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
- break;
- }
- }
- return;
- },
- else => @compileError("unsupported OS"),
- }
- }
- }
-
- pub fn sleep(self: *Loop, nanoseconds: u64) void {
- if (builtin.single_threaded)
- @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded");
-
- suspend {
- const now = self.delay_queue.timer.read();
-
- var entry: DelayQueue.Waiters.Entry = undefined;
- entry.init(@frame(), now + nanoseconds);
- self.delay_queue.waiters.insert(&entry);
-
- // Speculatively wake up the timer thread when we add a new entry.
- // If the timer thread is sleeping on a longer entry, we need to
- // interrupt it so that our entry can be expired in time.
- self.delay_queue.event.set();
- }
- }
-
- const DelayQueue = struct {
- timer: std.time.Timer,
- waiters: Waiters,
- thread: std.Thread,
- event: std.Thread.ResetEvent,
- is_running: std.atomic.Value(bool),
-
- /// Initialize the delay queue by spawning the timer thread
- /// and starting any timer resources.
- fn init(self: *DelayQueue) !void {
- self.* = DelayQueue{
- .timer = try std.time.Timer.start(),
- .waiters = DelayQueue.Waiters{
- .entries = std.atomic.Queue(anyframe).init(),
- },
- .thread = undefined,
- .event = .{},
- .is_running = std.atomic.Value(bool).init(true),
- };
-
- // Must be after init so that it can read the other state, such as `is_running`.
- self.thread = try std.Thread.spawn(.{}, DelayQueue.run, .{self});
- }
-
- fn deinit(self: *DelayQueue) void {
- self.is_running.store(false, .SeqCst);
- self.event.set();
- self.thread.join();
- }
-
- /// Entry point for the timer thread
- /// which waits for timer entries to expire and reschedules them.
- fn run(self: *DelayQueue) void {
- const loop = @fieldParentPtr(Loop, "delay_queue", self);
-
- while (self.is_running.load(.SeqCst)) {
- self.event.reset();
- const now = self.timer.read();
-
- if (self.waiters.popExpired(now)) |entry| {
- loop.onNextTick(&entry.node);
- continue;
- }
-
- if (self.waiters.nextExpire()) |expires| {
- if (now >= expires)
- continue;
- self.event.timedWait(expires - now) catch {};
- } else {
- self.event.wait();
- }
- }
- }
-
- // TODO: use a tickless hierarchical timer wheel:
- // https://github.com/wahern/timeout/
- const Waiters = struct {
- entries: std.atomic.Queue(anyframe),
-
- const Entry = struct {
- node: NextTickNode,
- expires: u64,
-
- fn init(self: *Entry, frame: anyframe, expires: u64) void {
- self.node.data = frame;
- self.expires = expires;
- }
- };
-
- /// Registers the entry into the queue of waiting frames
- fn insert(self: *Waiters, entry: *Entry) void {
- self.entries.put(&entry.node);
- }
-
- /// Dequeues one expired event relative to `now`
- fn popExpired(self: *Waiters, now: u64) ?*Entry {
- const entry = self.peekExpiringEntry() orelse return null;
- if (entry.expires > now)
- return null;
-
- assert(self.entries.remove(&entry.node));
- return entry;
- }
-
- /// Returns an estimate for the amount of time
- /// to wait until the next waiting entry expires.
- fn nextExpire(self: *Waiters) ?u64 {
- const entry = self.peekExpiringEntry() orelse return null;
- return entry.expires;
- }
-
- fn peekExpiringEntry(self: *Waiters) ?*Entry {
- self.entries.mutex.lock();
- defer self.entries.mutex.unlock();
-
- // starting from the head
- var head = self.entries.head orelse return null;
-
- // traverse the list of waiting entries to
- // find the Node with the smallest `expires` field
- var min = head;
- while (head.next) |node| {
- const minEntry = @fieldParentPtr(Entry, "node", min);
- const nodeEntry = @fieldParentPtr(Entry, "node", node);
- if (nodeEntry.expires < minEntry.expires)
- min = node;
- head = node;
- }
-
- return @fieldParentPtr(Entry, "node", min);
- }
- };
- };
-
- /// ------- I/0 APIs -------
- pub fn accept(
- self: *Loop,
- /// This argument is a socket that has been created with `socket`, bound to a local address
- /// with `bind`, and is listening for connections after a `listen`.
- sockfd: os.socket_t,
- /// This argument is a pointer to a sockaddr structure. This structure is filled in with the
- /// address of the peer socket, as known to the communications layer. The exact format of the
- /// address returned addr is determined by the socket's address family (see `socket` and the
- /// respective protocol man pages).
- addr: *os.sockaddr,
- /// This argument is a value-result argument: the caller must initialize it to contain the
- /// size (in bytes) of the structure pointed to by addr; on return it will contain the actual size
- /// of the peer address.
- ///
- /// The returned address is truncated if the buffer provided is too small; in this case, `addr_size`
- /// will return a value greater than was supplied to the call.
- addr_size: *os.socklen_t,
- /// The following values can be bitwise ORed in flags to obtain different behavior:
- /// * `SOCK.CLOEXEC` - Set the close-on-exec (`FD_CLOEXEC`) flag on the new file descriptor. See the
- /// description of the `O.CLOEXEC` flag in `open` for reasons why this may be useful.
- flags: u32,
- ) os.AcceptError!os.socket_t {
- while (true) {
- return os.accept(sockfd, addr, addr_size, flags | os.SOCK.NONBLOCK) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdReadable(sockfd);
- continue;
- },
- else => return err,
- };
- }
- }
-
- pub fn connect(self: *Loop, sockfd: os.socket_t, sock_addr: *const os.sockaddr, len: os.socklen_t) os.ConnectError!void {
- os.connect(sockfd, sock_addr, len) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdWritable(sockfd);
- return os.getsockoptError(sockfd);
- },
- else => return err,
- };
- }
-
- /// Performs an async `os.open` using a separate thread.
- pub fn openZ(self: *Loop, file_path: [*:0]const u8, flags: u32, mode: os.mode_t) os.OpenError!os.fd_t {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .open = .{
- .path = file_path,
- .flags = flags,
- .mode = mode,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.open.result;
- }
-
- /// Performs an async `os.opent` using a separate thread.
- pub fn openatZ(self: *Loop, fd: os.fd_t, file_path: [*:0]const u8, flags: u32, mode: os.mode_t) os.OpenError!os.fd_t {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .openat = .{
- .fd = fd,
- .path = file_path,
- .flags = flags,
- .mode = mode,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.openat.result;
- }
-
- /// Performs an async `os.close` using a separate thread.
- pub fn close(self: *Loop, fd: os.fd_t) void {
- var req_node = Request.Node{
- .data = .{
- .msg = .{ .close = .{ .fd = fd } },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- }
-
- /// Performs an async `os.read` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn read(self: *Loop, fd: os.fd_t, buf: []u8, simulate_evented: bool) os.ReadError!usize {
- if (simulate_evented) {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .read = .{
- .fd = fd,
- .buf = buf,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.read.result;
- } else {
- while (true) {
- return os.read(fd, buf) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdReadable(fd);
- continue;
- },
- else => return err,
- };
- }
- }
- }
-
- /// Performs an async `os.readv` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn readv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, simulate_evented: bool) os.ReadError!usize {
- if (simulate_evented) {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .readv = .{
- .fd = fd,
- .iov = iov,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.readv.result;
- } else {
- while (true) {
- return os.readv(fd, iov) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdReadable(fd);
- continue;
- },
- else => return err,
- };
- }
- }
- }
-
- /// Performs an async `os.pread` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64, simulate_evented: bool) os.PReadError!usize {
- if (simulate_evented) {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .pread = .{
- .fd = fd,
- .buf = buf,
- .offset = offset,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.pread.result;
- } else {
- while (true) {
- return os.pread(fd, buf, offset) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdReadable(fd);
- continue;
- },
- else => return err,
- };
- }
- }
- }
-
- /// Performs an async `os.preadv` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64, simulate_evented: bool) os.ReadError!usize {
- if (simulate_evented) {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .preadv = .{
- .fd = fd,
- .iov = iov,
- .offset = offset,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.preadv.result;
- } else {
- while (true) {
- return os.preadv(fd, iov, offset) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdReadable(fd);
- continue;
- },
- else => return err,
- };
- }
- }
- }
-
- /// Performs an async `os.write` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8, simulate_evented: bool) os.WriteError!usize {
- if (simulate_evented) {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .write = .{
- .fd = fd,
- .bytes = bytes,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.write.result;
- } else {
- while (true) {
- return os.write(fd, bytes) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdWritable(fd);
- continue;
- },
- else => return err,
- };
- }
- }
- }
-
- /// Performs an async `os.writev` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, simulate_evented: bool) os.WriteError!usize {
- if (simulate_evented) {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .writev = .{
- .fd = fd,
- .iov = iov,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.writev.result;
- } else {
- while (true) {
- return os.writev(fd, iov) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdWritable(fd);
- continue;
- },
- else => return err,
- };
- }
- }
- }
-
- /// Performs an async `os.pwrite` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn pwrite(self: *Loop, fd: os.fd_t, bytes: []const u8, offset: u64, simulate_evented: bool) os.PerformsWriteError!usize {
- if (simulate_evented) {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .pwrite = .{
- .fd = fd,
- .bytes = bytes,
- .offset = offset,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.pwrite.result;
- } else {
- while (true) {
- return os.pwrite(fd, bytes, offset) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdWritable(fd);
- continue;
- },
- else => return err,
- };
- }
- }
- }
-
- /// Performs an async `os.pwritev` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64, simulate_evented: bool) os.PWriteError!usize {
- if (simulate_evented) {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .pwritev = .{
- .fd = fd,
- .iov = iov,
- .offset = offset,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.pwritev.result;
- } else {
- while (true) {
- return os.pwritev(fd, iov, offset) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdWritable(fd);
- continue;
- },
- else => return err,
- };
- }
- }
- }
-
- pub fn sendto(
- self: *Loop,
- /// The file descriptor of the sending socket.
- sockfd: os.fd_t,
- /// Message to send.
- buf: []const u8,
- flags: u32,
- dest_addr: ?*const os.sockaddr,
- addrlen: os.socklen_t,
- ) os.SendToError!usize {
- while (true) {
- return os.sendto(sockfd, buf, flags, dest_addr, addrlen) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdWritable(sockfd);
- continue;
- },
- else => return err,
- };
- }
- }
-
- pub fn recvfrom(
- self: *Loop,
- sockfd: os.fd_t,
- buf: []u8,
- flags: u32,
- src_addr: ?*os.sockaddr,
- addrlen: ?*os.socklen_t,
- ) os.RecvFromError!usize {
- while (true) {
- return os.recvfrom(sockfd, buf, flags, src_addr, addrlen) catch |err| switch (err) {
- error.WouldBlock => {
- self.waitUntilFdReadable(sockfd);
- continue;
- },
- else => return err,
- };
- }
- }
-
- /// Performs an async `os.faccessatZ` using a separate thread.
- /// `fd` must block and not return EAGAIN.
- pub fn faccessatZ(
- self: *Loop,
- dirfd: os.fd_t,
- path_z: [*:0]const u8,
- mode: u32,
- flags: u32,
- ) os.AccessError!void {
- var req_node = Request.Node{
- .data = .{
- .msg = .{
- .faccessat = .{
- .dirfd = dirfd,
- .path = path_z,
- .mode = mode,
- .flags = flags,
- .result = undefined,
- },
- },
- .finish = .{ .tick_node = .{ .data = @frame() } },
- },
- };
- suspend {
- self.posixFsRequest(&req_node);
- }
- return req_node.data.msg.faccessat.result;
- }
-
- fn workerRun(self: *Loop) void {
- while (true) {
- while (true) {
- const next_tick_node = self.next_tick_queue.get() orelse break;
- self.dispatch();
- resume next_tick_node.data;
- self.finishOneEvent();
- }
-
- switch (builtin.os.tag) {
- .linux => {
- // only process 1 event so we don't steal from other threads
- var events: [1]os.linux.epoll_event = undefined;
- const count = os.epoll_wait(self.os_data.epollfd, events[0..], -1);
- for (events[0..count]) |ev| {
- const resume_node = @as(*ResumeNode, @ptrFromInt(ev.data.ptr));
- const handle = resume_node.handle;
- const resume_node_id = resume_node.id;
- switch (resume_node_id) {
- .basic => {},
- .stop => return,
- .event_fd => {
- const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
- event_fd_node.epoll_op = os.linux.EPOLL.CTL_MOD;
- const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
- self.available_eventfd_resume_nodes.push(stack_node);
- },
- }
- resume handle;
- if (resume_node_id == .event_fd) {
- self.finishOneEvent();
- }
- }
- },
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- var eventlist: [1]os.Kevent = undefined;
- const empty_kevs = &[0]os.Kevent{};
- const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable;
- for (eventlist[0..count]) |ev| {
- const resume_node = @as(*ResumeNode, @ptrFromInt(ev.udata));
- const handle = resume_node.handle;
- const resume_node_id = resume_node.id;
- switch (resume_node_id) {
- .basic => {
- const basic_node = @fieldParentPtr(ResumeNode.Basic, "base", resume_node);
- basic_node.kev = ev;
- },
- .stop => return,
- .event_fd => {
- const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
- const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
- self.available_eventfd_resume_nodes.push(stack_node);
- },
- }
- resume handle;
- if (resume_node_id == .event_fd) {
- self.finishOneEvent();
- }
- }
- },
- .windows => {
- var completion_key: usize = undefined;
- const overlapped = while (true) {
- var nbytes: windows.DWORD = undefined;
- var overlapped: ?*windows.OVERLAPPED = undefined;
- switch (windows.GetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
- .Aborted => return,
- .Normal => {},
- .EOF => {},
- .Cancelled => continue,
- }
- if (overlapped) |o| break o;
- };
- const resume_node = @fieldParentPtr(ResumeNode, "overlapped", overlapped);
- const handle = resume_node.handle;
- const resume_node_id = resume_node.id;
- switch (resume_node_id) {
- .basic => {},
- .stop => return,
- .event_fd => {
- const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
- const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
- self.available_eventfd_resume_nodes.push(stack_node);
- },
- }
- resume handle;
- self.finishOneEvent();
- },
- else => @compileError("unsupported OS"),
- }
- }
- }
-
- fn posixFsRequest(self: *Loop, request_node: *Request.Node) void {
- self.beginOneEvent(); // finished in posixFsRun after processing the msg
- self.fs_queue.put(request_node);
- self.fs_thread_wakeup.set();
- }
-
- fn posixFsCancel(self: *Loop, request_node: *Request.Node) void {
- if (self.fs_queue.remove(request_node)) {
- self.finishOneEvent();
- }
- }
-
- fn posixFsRun(self: *Loop) void {
- nosuspend while (true) {
- self.fs_thread_wakeup.reset();
- while (self.fs_queue.get()) |node| {
- switch (node.data.msg) {
- .end => return,
- .read => |*msg| {
- msg.result = os.read(msg.fd, msg.buf);
- },
- .readv => |*msg| {
- msg.result = os.readv(msg.fd, msg.iov);
- },
- .write => |*msg| {
- msg.result = os.write(msg.fd, msg.bytes);
- },
- .writev => |*msg| {
- msg.result = os.writev(msg.fd, msg.iov);
- },
- .pwrite => |*msg| {
- msg.result = os.pwrite(msg.fd, msg.bytes, msg.offset);
- },
- .pwritev => |*msg| {
- msg.result = os.pwritev(msg.fd, msg.iov, msg.offset);
- },
- .pread => |*msg| {
- msg.result = os.pread(msg.fd, msg.buf, msg.offset);
- },
- .preadv => |*msg| {
- msg.result = os.preadv(msg.fd, msg.iov, msg.offset);
- },
- .open => |*msg| {
- if (is_windows) unreachable; // TODO
- msg.result = os.openZ(msg.path, msg.flags, msg.mode);
- },
- .openat => |*msg| {
- if (is_windows) unreachable; // TODO
- msg.result = os.openatZ(msg.fd, msg.path, msg.flags, msg.mode);
- },
- .faccessat => |*msg| {
- msg.result = os.faccessatZ(msg.dirfd, msg.path, msg.mode, msg.flags);
- },
- .close => |*msg| os.close(msg.fd),
- }
- switch (node.data.finish) {
- .tick_node => |*tick_node| self.onNextTick(tick_node),
- .no_action => {},
- }
- self.finishOneEvent();
- }
- self.fs_thread_wakeup.wait();
- };
- }
-
- const OsData = switch (builtin.os.tag) {
- .linux => LinuxOsData,
- .macos, .ios, .tvos, .watchos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventData,
- .windows => struct {
- io_port: windows.HANDLE,
- extra_thread_count: usize,
- },
- else => struct {},
- };
-
- const KEventData = struct {
- kqfd: i32,
- final_kevent: os.Kevent,
- };
-
- const LinuxOsData = struct {
- epollfd: i32,
- final_eventfd: i32,
- final_eventfd_event: os.linux.epoll_event,
- };
-
- pub const Request = struct {
- msg: Msg,
- finish: Finish,
-
- pub const Node = std.atomic.Queue(Request).Node;
-
- pub const Finish = union(enum) {
- tick_node: Loop.NextTickNode,
- no_action,
- };
-
- pub const Msg = union(enum) {
- read: Read,
- readv: ReadV,
- write: Write,
- writev: WriteV,
- pwrite: PWrite,
- pwritev: PWriteV,
- pread: PRead,
- preadv: PReadV,
- open: Open,
- openat: OpenAt,
- close: Close,
- faccessat: FAccessAt,
-
- /// special - means the fs thread should exit
- end,
-
- pub const Read = struct {
- fd: os.fd_t,
- buf: []u8,
- result: Error!usize,
-
- pub const Error = os.ReadError;
- };
-
- pub const ReadV = struct {
- fd: os.fd_t,
- iov: []const os.iovec,
- result: Error!usize,
-
- pub const Error = os.ReadError;
- };
-
- pub const Write = struct {
- fd: os.fd_t,
- bytes: []const u8,
- result: Error!usize,
-
- pub const Error = os.WriteError;
- };
-
- pub const WriteV = struct {
- fd: os.fd_t,
- iov: []const os.iovec_const,
- result: Error!usize,
-
- pub const Error = os.WriteError;
- };
-
- pub const PWrite = struct {
- fd: os.fd_t,
- bytes: []const u8,
- offset: usize,
- result: Error!usize,
-
- pub const Error = os.PWriteError;
- };
-
- pub const PWriteV = struct {
- fd: os.fd_t,
- iov: []const os.iovec_const,
- offset: usize,
- result: Error!usize,
-
- pub const Error = os.PWriteError;
- };
-
- pub const PRead = struct {
- fd: os.fd_t,
- buf: []u8,
- offset: usize,
- result: Error!usize,
-
- pub const Error = os.PReadError;
- };
-
- pub const PReadV = struct {
- fd: os.fd_t,
- iov: []const os.iovec,
- offset: usize,
- result: Error!usize,
-
- pub const Error = os.PReadError;
- };
-
- pub const Open = struct {
- path: [*:0]const u8,
- flags: u32,
- mode: os.mode_t,
- result: Error!os.fd_t,
-
- pub const Error = os.OpenError;
- };
-
- pub const OpenAt = struct {
- fd: os.fd_t,
- path: [*:0]const u8,
- flags: u32,
- mode: os.mode_t,
- result: Error!os.fd_t,
-
- pub const Error = os.OpenError;
- };
-
- pub const Close = struct {
- fd: os.fd_t,
- };
-
- pub const FAccessAt = struct {
- dirfd: os.fd_t,
- path: [*:0]const u8,
- mode: u32,
- flags: u32,
- result: Error!void,
-
- pub const Error = os.AccessError;
- };
- };
- };
-};
-
-test "std.event.Loop - basic" {
- // https://github.com/ziglang/zig/issues/1908
- if (builtin.single_threaded) return error.SkipZigTest;
-
- if (true) {
- // https://github.com/ziglang/zig/issues/4922
- return error.SkipZigTest;
- }
-
- var loop: Loop = undefined;
- try loop.initMultiThreaded();
- defer loop.deinit();
-
- loop.run();
-}
-
-fn testEventLoop() i32 {
- return 1234;
-}
-
-fn testEventLoop2(h: anyframe->i32, did_it: *bool) void {
- const value = await h;
- try testing.expect(value == 1234);
- did_it.* = true;
-}
-
-var testRunDetachedData: usize = 0;
-test "std.event.Loop - runDetached" {
- // https://github.com/ziglang/zig/issues/1908
- if (builtin.single_threaded) return error.SkipZigTest;
- if (!std.io.is_async) return error.SkipZigTest;
- if (true) {
- // https://github.com/ziglang/zig/issues/4922
- return error.SkipZigTest;
- }
-
- var loop: Loop = undefined;
- try loop.initMultiThreaded();
- defer loop.deinit();
-
- // Schedule the execution, won't actually start until we start the
- // event loop.
- try loop.runDetached(std.testing.allocator, testRunDetached, .{});
-
- // Now we can start the event loop. The function will return only
- // after all tasks have been completed, allowing us to synchronize
- // with the previous runDetached.
- loop.run();
-
- try testing.expect(testRunDetachedData == 1);
-}
-
-fn testRunDetached() void {
- testRunDetachedData += 1;
-}
-
-test "std.event.Loop - sleep" {
- // https://github.com/ziglang/zig/issues/1908
- if (builtin.single_threaded) return error.SkipZigTest;
- if (!std.io.is_async) return error.SkipZigTest;
-
- const frames = try testing.allocator.alloc(@Frame(testSleep), 10);
- defer testing.allocator.free(frames);
-
- const wait_time = 100 * std.time.ns_per_ms;
- var sleep_count: usize = 0;
-
- for (frames) |*frame|
- frame.* = async testSleep(wait_time, &sleep_count);
- for (frames) |*frame|
- await frame;
-
- try testing.expect(sleep_count == frames.len);
-}
-
-fn testSleep(wait_ns: u64, sleep_count: *usize) void {
- Loop.instance.?.sleep(wait_ns);
- _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst);
-}