aboutsummaryrefslogtreecommitdiff
path: root/std/event/loop.zig
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-07-10 14:03:03 -0400
committerAndrew Kelley <superjoe30@gmail.com>2018-07-10 14:03:03 -0400
commitcfaebb20d8906d31cedc59e39e6a9286967a931a (patch)
tree33d0c3994bfb658937e2e692a946bc8a2eca4fe5 /std/event/loop.zig
parentb5d07297dec61a3993dfe91ceee2c87672db1e8e (diff)
parent0ce6934e2631eb3beca817d3bce12ecb13aafa13 (diff)
downloadzig-cfaebb20d8906d31cedc59e39e6a9286967a931a.tar.gz
zig-cfaebb20d8906d31cedc59e39e6a9286967a931a.zip
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std/event/loop.zig')
-rw-r--r--std/event/loop.zig629
1 files changed, 629 insertions, 0 deletions
diff --git a/std/event/loop.zig b/std/event/loop.zig
new file mode 100644
index 0000000000..646f15875f
--- /dev/null
+++ b/std/event/loop.zig
@@ -0,0 +1,629 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const mem = std.mem;
+const posix = std.os.posix;
+const windows = std.os.windows;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+
+pub const Loop = struct {
+ allocator: *mem.Allocator,
+ next_tick_queue: std.atomic.QueueMpsc(promise),
+ os_data: OsData,
+ final_resume_node: ResumeNode,
+ dispatch_lock: u8, // TODO make this a bool
+ pending_event_count: usize,
+ extra_threads: []*std.os.Thread,
+
+ // pre-allocated eventfds. all permanently active.
+ // this is how we send promises to be resumed on other threads.
+ available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
+ eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
+
+ pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
+
+ pub const ResumeNode = struct {
+ id: Id,
+ handle: promise,
+
+ pub const Id = enum {
+ Basic,
+ Stop,
+ EventFd,
+ };
+
+ pub const EventFd = switch (builtin.os) {
+ builtin.Os.macosx => MacOsEventFd,
+ builtin.Os.linux => struct {
+ base: ResumeNode,
+ epoll_op: u32,
+ eventfd: i32,
+ },
+ builtin.Os.windows => struct {
+ base: ResumeNode,
+ completion_key: usize,
+ },
+ else => @compileError("unsupported OS"),
+ };
+
+ const MacOsEventFd = struct {
+ base: ResumeNode,
+ kevent: posix.Kevent,
+ };
+ };
+
+ /// After initialization, call run().
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
+ return self.initInternal(allocator, 1);
+ }
+
+ /// The allocator must be thread-safe because we use it for multiplexing
+ /// coroutines onto kernel threads.
+ /// After initialization, call run().
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
+ const core_count = try std.os.cpuCount(allocator);
+ return self.initInternal(allocator, core_count);
+ }
+
+ /// Thread count is the total thread count. The thread pool size will be
+ /// max(thread_count - 1, 0)
+ fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
+ self.* = Loop{
+ .pending_event_count = 0,
+ .allocator = allocator,
+ .os_data = undefined,
+ .next_tick_queue = std.atomic.QueueMpsc(promise).init(),
+ .dispatch_lock = 1, // start locked so threads go directly into epoll wait
+ .extra_threads = undefined,
+ .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
+ .eventfd_resume_nodes = undefined,
+ .final_resume_node = ResumeNode{
+ .id = ResumeNode.Id.Stop,
+ .handle = undefined,
+ },
+ };
+ const extra_thread_count = thread_count - 1;
+ self.eventfd_resume_nodes = try self.allocator.alloc(
+ std.atomic.Stack(ResumeNode.EventFd).Node,
+ extra_thread_count,
+ );
+ errdefer self.allocator.free(self.eventfd_resume_nodes);
+
+ self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count);
+ errdefer self.allocator.free(self.extra_threads);
+
+ try self.initOsData(extra_thread_count);
+ errdefer self.deinitOsData();
+ }
+
+ /// must call stop before deinit
+ pub fn deinit(self: *Loop) void {
+ self.deinitOsData();
+ self.allocator.free(self.extra_threads);
+ }
+
+ const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError ||
+ std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError ||
+ std.os.WindowsCreateIoCompletionPortError;
+
+ const wakeup_bytes = []u8{0x1} ** 8;
+
+ fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ errdefer {
+ while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
+ }
+ for (self.eventfd_resume_nodes) |*eventfd_node| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ .eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
+ .epoll_op = posix.EPOLL_CTL_ADD,
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ }
+
+ self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
+ errdefer std.os.close(self.os_data.epollfd);
+
+ self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK);
+ errdefer std.os.close(self.os_data.final_eventfd);
+
+ self.os_data.final_eventfd_event = posix.epoll_event{
+ .events = posix.EPOLLIN,
+ .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
+ };
+ try std.os.linuxEpollCtl(
+ self.os_data.epollfd,
+ posix.EPOLL_CTL_ADD,
+ self.os_data.final_eventfd,
+ &self.os_data.final_eventfd_event,
+ );
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ // writing 8 bytes to an eventfd cannot fail
+ std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ builtin.Os.macosx => {
+ self.os_data.kqfd = try std.os.bsdKQueue();
+ errdefer std.os.close(self.os_data.kqfd);
+
+ self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count);
+ errdefer self.allocator.free(self.os_data.kevents);
+
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ // this one is for sending events
+ .kevent = posix.Kevent{
+ .ident = i,
+ .filter = posix.EVFILT_USER,
+ .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&eventfd_node.data.base),
+ },
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent);
+ _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
+ eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE;
+ eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER;
+ // this one is for waiting for events
+ self.os_data.kevents[i] = posix.Kevent{
+ .ident = i,
+ .filter = posix.EVFILT_USER,
+ .flags = 0,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&eventfd_node.data.base),
+ };
+ }
+
+ // Pre-add so that we cannot get error.SystemResources
+ // later when we try to activate it.
+ self.os_data.final_kevent = posix.Kevent{
+ .ident = extra_thread_count,
+ .filter = posix.EVFILT_USER,
+ .flags = posix.EV_ADD | posix.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&self.final_resume_node),
+ };
+ const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+ _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
+ self.os_data.final_kevent.flags = posix.EV_ENABLE;
+ self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER;
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ builtin.Os.windows => {
+ self.os_data.extra_thread_count = extra_thread_count;
+
+ self.os_data.io_port = try std.os.windowsCreateIoCompletionPort(
+ windows.INVALID_HANDLE_VALUE,
+ null,
+ undefined,
+ undefined,
+ );
+ errdefer std.os.close(self.os_data.io_port);
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ // this one is for sending events
+ .completion_key = @ptrToInt(&eventfd_node.data.base),
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ }
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ var i: usize = 0;
+ while (i < extra_thread_index) : (i += 1) {
+ while (true) {
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+ break;
+ }
+ }
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ else => {},
+ }
+ }
+
+ fn deinitOsData(self: *Loop) void {
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ std.os.close(self.os_data.final_eventfd);
+ while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
+ std.os.close(self.os_data.epollfd);
+ self.allocator.free(self.eventfd_resume_nodes);
+ },
+ builtin.Os.macosx => {
+ self.allocator.free(self.os_data.kevents);
+ std.os.close(self.os_data.kqfd);
+ },
+ builtin.Os.windows => {
+ std.os.close(self.os_data.io_port);
+ },
+ else => {},
+ }
+ }
+
+ /// resume_node must live longer than the promise that it holds a reference to.
+ pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ errdefer {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ try self.modFd(
+ fd,
+ posix.EPOLL_CTL_ADD,
+ std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
+ resume_node,
+ );
+ }
+
+ pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void {
+ var ev = std.os.linux.epoll_event{
+ .events = events,
+ .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
+ };
+ try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev);
+ }
+
+ pub fn removeFd(self: *Loop, fd: i32) void {
+ self.removeFdNoCounter(fd);
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ fn removeFdNoCounter(self: *Loop, fd: i32) void {
+ std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
+ }
+
+ pub async fn waitFd(self: *Loop, fd: i32) !void {
+ defer self.removeFd(fd);
+ suspend |p| {
+ // TODO explicitly put this memory in the coroutine frame #1194
+ var resume_node = ResumeNode{
+ .id = ResumeNode.Id.Basic,
+ .handle = p,
+ };
+ try self.addFd(fd, &resume_node);
+ }
+ }
+
+ /// Bring your own linked list node. This means it can't fail.
+ pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ self.next_tick_queue.put(node);
+ }
+
+ pub fn run(self: *Loop) void {
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.workerRun();
+ for (self.extra_threads) |extra_thread| {
+ extra_thread.wait();
+ }
+ }
+
+ /// This is equivalent to an async call, except instead of beginning execution of the async function,
+ /// it immediately returns to the caller, and the async function is queued in the event loop. It still
+ /// returns a promise to be awaited.
+ pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) {
+ const S = struct {
+ async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType {
+ suspend |p| {
+ handle.* = p;
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = p,
+ };
+ loop.onNextTick(&my_tick_node);
+ }
+ // TODO guaranteed allocation elision for await in same func as async
+ return await (async func(args2) catch unreachable);
+ }
+ };
+ var handle: promise->@typeOf(func).ReturnType = undefined;
+ return async<self.allocator> S.asyncFunc(self, &handle, args);
+ }
+
+ fn workerRun(self: *Loop) void {
+ start_over: while (true) {
+ if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) {
+ while (self.next_tick_queue.get()) |next_tick_node| {
+ const handle = next_tick_node.data;
+ if (self.next_tick_queue.isEmpty()) {
+ // last node, just resume it
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ }
+
+ // non-last node, stick it in the epoll/kqueue set so that
+ // other threads can get to it
+ if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
+ const eventfd_node = &resume_stack_node.data;
+ eventfd_node.base.handle = handle;
+ switch (builtin.os) {
+ builtin.Os.macosx => {
+ const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+ _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ builtin.Os.linux => {
+ // the pending count is already accounted for
+ const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
+ self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ builtin.Os.windows => {
+ // this value is never dereferenced but we need it to be non-null so that
+ // the consumer code can decide whether to read the completion key.
+ // it has to do this for normal I/O, so we match that behavior here.
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ else => @compileError("unsupported OS"),
+ }
+ } else {
+ // threads are too busy, can't add another eventfd to wake one up
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ }
+ }
+
+ const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst);
+ if (pending_event_count == 0) {
+ // cause all the threads to stop
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ // writing 8 bytes to an eventfd cannot fail
+ std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ return;
+ },
+ builtin.Os.macosx => {
+ const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+ // cannot fail because we already added it and this just enables it
+ _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
+ return;
+ },
+ builtin.Os.windows => {
+ var i: usize = 0;
+ while (i < self.os_data.extra_thread_count) : (i += 1) {
+ while (true) {
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+ break;
+ }
+ }
+ return;
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ }
+
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ // only process 1 event so we don't steal from other threads
+ var events: [1]std.os.linux.epoll_event = undefined;
+ const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
+ for (events[0..count]) |ev| {
+ const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ event_fd_node.epoll_op = posix.EPOLL_CTL_MOD;
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+ },
+ builtin.Os.macosx => {
+ var eventlist: [1]posix.Kevent = undefined;
+ const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable;
+ for (eventlist[0..count]) |ev| {
+ const resume_node = @intToPtr(*ResumeNode, ev.udata);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+ },
+ builtin.Os.windows => {
+ var completion_key: usize = undefined;
+ while (true) {
+ var nbytes: windows.DWORD = undefined;
+ var overlapped: ?*windows.OVERLAPPED = undefined;
+ switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
+ std.os.WindowsWaitResult.Aborted => return,
+ std.os.WindowsWaitResult.Normal => {},
+ }
+ if (overlapped != null) break;
+ }
+ const resume_node = @intToPtr(*ResumeNode, completion_key);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+ }
+
+ const OsData = switch (builtin.os) {
+ builtin.Os.linux => struct {
+ epollfd: i32,
+ final_eventfd: i32,
+ final_eventfd_event: std.os.linux.epoll_event,
+ },
+ builtin.Os.macosx => MacOsData,
+ builtin.Os.windows => struct {
+ io_port: windows.HANDLE,
+ extra_thread_count: usize,
+ },
+ else => struct {},
+ };
+
+ const MacOsData = struct {
+ kqfd: i32,
+ final_kevent: posix.Kevent,
+ kevents: []posix.Kevent,
+ };
+};
+
+test "std.event.Loop - basic" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ loop.run();
+}
+
+test "std.event.Loop - call" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var did_it = false;
+ const handle = try loop.call(testEventLoop);
+ const handle2 = try loop.call(testEventLoop2, handle, &did_it);
+ defer cancel handle2;
+
+ loop.run();
+
+ assert(did_it);
+}
+
+async fn testEventLoop() i32 {
+ return 1234;
+}
+
+async fn testEventLoop2(h: promise->i32, did_it: *bool) void {
+ const value = await h;
+ assert(value == 1234);
+ did_it.* = true;
+}