diff options
| author | Andrew Kelley <superjoe30@gmail.com> | 2018-07-10 14:03:03 -0400 |
|---|---|---|
| committer | Andrew Kelley <superjoe30@gmail.com> | 2018-07-10 14:03:03 -0400 |
| commit | cfaebb20d8906d31cedc59e39e6a9286967a931a (patch) | |
| tree | 33d0c3994bfb658937e2e692a946bc8a2eca4fe5 /std/event/loop.zig | |
| parent | b5d07297dec61a3993dfe91ceee2c87672db1e8e (diff) | |
| parent | 0ce6934e2631eb3beca817d3bce12ecb13aafa13 (diff) | |
| download | zig-cfaebb20d8906d31cedc59e39e6a9286967a931a.tar.gz zig-cfaebb20d8906d31cedc59e39e6a9286967a931a.zip | |
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std/event/loop.zig')
| -rw-r--r-- | std/event/loop.zig | 629 |
1 files changed, 629 insertions, 0 deletions
diff --git a/std/event/loop.zig b/std/event/loop.zig new file mode 100644 index 0000000000..646f15875f --- /dev/null +++ b/std/event/loop.zig @@ -0,0 +1,629 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const mem = std.mem; +const posix = std.os.posix; +const windows = std.os.windows; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; + +pub const Loop = struct { + allocator: *mem.Allocator, + next_tick_queue: std.atomic.QueueMpsc(promise), + os_data: OsData, + final_resume_node: ResumeNode, + dispatch_lock: u8, // TODO make this a bool + pending_event_count: usize, + extra_threads: []*std.os.Thread, + + // pre-allocated eventfds. all permanently active. + // this is how we send promises to be resumed on other threads. + available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), + eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, + + pub const NextTickNode = std.atomic.QueueMpsc(promise).Node; + + pub const ResumeNode = struct { + id: Id, + handle: promise, + + pub const Id = enum { + Basic, + Stop, + EventFd, + }; + + pub const EventFd = switch (builtin.os) { + builtin.Os.macosx => MacOsEventFd, + builtin.Os.linux => struct { + base: ResumeNode, + epoll_op: u32, + eventfd: i32, + }, + builtin.Os.windows => struct { + base: ResumeNode, + completion_key: usize, + }, + else => @compileError("unsupported OS"), + }; + + const MacOsEventFd = struct { + base: ResumeNode, + kevent: posix.Kevent, + }; + }; + + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { + return self.initInternal(allocator, 1); + } + + /// The allocator must be thread-safe because we use it for multiplexing + /// coroutines onto kernel threads. + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { + const core_count = try std.os.cpuCount(allocator); + return self.initInternal(allocator, core_count); + } + + /// Thread count is the total thread count. The thread pool size will be + /// max(thread_count - 1, 0) + fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { + self.* = Loop{ + .pending_event_count = 0, + .allocator = allocator, + .os_data = undefined, + .next_tick_queue = std.atomic.QueueMpsc(promise).init(), + .dispatch_lock = 1, // start locked so threads go directly into epoll wait + .extra_threads = undefined, + .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), + .eventfd_resume_nodes = undefined, + .final_resume_node = ResumeNode{ + .id = ResumeNode.Id.Stop, + .handle = undefined, + }, + }; + const extra_thread_count = thread_count - 1; + self.eventfd_resume_nodes = try self.allocator.alloc( + std.atomic.Stack(ResumeNode.EventFd).Node, + extra_thread_count, + ); + errdefer self.allocator.free(self.eventfd_resume_nodes); + + self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count); + errdefer self.allocator.free(self.extra_threads); + + try self.initOsData(extra_thread_count); + errdefer self.deinitOsData(); + } + + /// must call stop before deinit + pub fn deinit(self: *Loop) void { + self.deinitOsData(); + self.allocator.free(self.extra_threads); + } + + const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError || + std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError || + std.os.WindowsCreateIoCompletionPortError; + + const wakeup_bytes = []u8{0x1} ** 8; + + fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { + switch (builtin.os) { + builtin.Os.linux => { + errdefer { + while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd); + } + for (self.eventfd_resume_nodes) |*eventfd_node| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + .eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), + .epoll_op = posix.EPOLL_CTL_ADD, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC); + errdefer std.os.close(self.os_data.epollfd); + + self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); + errdefer std.os.close(self.os_data.final_eventfd); + + self.os_data.final_eventfd_event = posix.epoll_event{ + .events = posix.EPOLLIN, + .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, + }; + try std.os.linuxEpollCtl( + self.os_data.epollfd, + posix.EPOLL_CTL_ADD, + self.os_data.final_eventfd, + &self.os_data.final_eventfd_event, + ); + + var extra_thread_index: usize = 0; + errdefer { + // writing 8 bytes to an eventfd cannot fail + std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + } + }, + builtin.Os.macosx => { + self.os_data.kqfd = try std.os.bsdKQueue(); + errdefer std.os.close(self.os_data.kqfd); + + self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count); + errdefer self.allocator.free(self.os_data.kevents); + + const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + // this one is for sending events + .kevent = posix.Kevent{ + .ident = i, + .filter = posix.EVFILT_USER, + .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent); + _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); + eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; + eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; + // this one is for waiting for events + self.os_data.kevents[i] = posix.Kevent{ + .ident = i, + .filter = posix.EVFILT_USER, + .flags = 0, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }; + } + + // Pre-add so that we cannot get error.SystemResources + // later when we try to activate it. + self.os_data.final_kevent = posix.Kevent{ + .ident = extra_thread_count, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD | posix.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&self.final_resume_node), + }; + const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent); + _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); + self.os_data.final_kevent.flags = posix.EV_ENABLE; + self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; + + var extra_thread_index: usize = 0; + errdefer { + _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + } + }, + builtin.Os.windows => { + self.os_data.extra_thread_count = extra_thread_count; + + self.os_data.io_port = try std.os.windowsCreateIoCompletionPort( + windows.INVALID_HANDLE_VALUE, + null, + undefined, + undefined, + ); + errdefer std.os.close(self.os_data.io_port); + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + // this one is for sending events + .completion_key = @ptrToInt(&eventfd_node.data.base), + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + var extra_thread_index: usize = 0; + errdefer { + var i: usize = 0; + while (i < extra_thread_index) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; + } + } + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + } + }, + else => {}, + } + } + + fn deinitOsData(self: *Loop) void { + switch (builtin.os) { + builtin.Os.linux => { + std.os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd); + std.os.close(self.os_data.epollfd); + self.allocator.free(self.eventfd_resume_nodes); + }, + builtin.Os.macosx => { + self.allocator.free(self.os_data.kevents); + std.os.close(self.os_data.kqfd); + }, + builtin.Os.windows => { + std.os.close(self.os_data.io_port); + }, + else => {}, + } + } + + /// resume_node must live longer than the promise that it holds a reference to. + pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + errdefer { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + try self.modFd( + fd, + posix.EPOLL_CTL_ADD, + std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET, + resume_node, + ); + } + + pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void { + var ev = std.os.linux.epoll_event{ + .events = events, + .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, + }; + try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev); + } + + pub fn removeFd(self: *Loop, fd: i32) void { + self.removeFdNoCounter(fd); + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + + fn removeFdNoCounter(self: *Loop, fd: i32) void { + std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; + } + + pub async fn waitFd(self: *Loop, fd: i32) !void { + defer self.removeFd(fd); + suspend |p| { + // TODO explicitly put this memory in the coroutine frame #1194 + var resume_node = ResumeNode{ + .id = ResumeNode.Id.Basic, + .handle = p, + }; + try self.addFd(fd, &resume_node); + } + } + + /// Bring your own linked list node. This means it can't fail. + pub fn onNextTick(self: *Loop, node: *NextTickNode) void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + self.next_tick_queue.put(node); + } + + pub fn run(self: *Loop) void { + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.workerRun(); + for (self.extra_threads) |extra_thread| { + extra_thread.wait(); + } + } + + /// This is equivalent to an async call, except instead of beginning execution of the async function, + /// it immediately returns to the caller, and the async function is queued in the event loop. It still + /// returns a promise to be awaited. + pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) { + const S = struct { + async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType { + suspend |p| { + handle.* = p; + var my_tick_node = Loop.NextTickNode{ + .next = undefined, + .data = p, + }; + loop.onNextTick(&my_tick_node); + } + // TODO guaranteed allocation elision for await in same func as async + return await (async func(args2) catch unreachable); + } + }; + var handle: promise->@typeOf(func).ReturnType = undefined; + return async<self.allocator> S.asyncFunc(self, &handle, args); + } + + fn workerRun(self: *Loop) void { + start_over: while (true) { + if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) { + while (self.next_tick_queue.get()) |next_tick_node| { + const handle = next_tick_node.data; + if (self.next_tick_queue.isEmpty()) { + // last node, just resume it + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + } + + // non-last node, stick it in the epoll/kqueue set so that + // other threads can get to it + if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { + const eventfd_node = &resume_stack_node.data; + eventfd_node.base.handle = handle; + switch (builtin.os) { + builtin.Os.macosx => { + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); + const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch { + // fine, we didn't need it anyway + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.available_eventfd_resume_nodes.push(resume_stack_node); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + }; + }, + builtin.Os.linux => { + // the pending count is already accounted for + const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET; + self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch { + // fine, we didn't need it anyway + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.available_eventfd_resume_nodes.push(resume_stack_node); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + }; + }, + builtin.Os.windows => { + // this value is never dereferenced but we need it to be non-null so that + // the consumer code can decide whether to read the completion key. + // it has to do this for normal I/O, so we match that behavior here. + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch { + // fine, we didn't need it anyway + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.available_eventfd_resume_nodes.push(resume_stack_node); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + }; + }, + else => @compileError("unsupported OS"), + } + } else { + // threads are too busy, can't add another eventfd to wake one up + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + } + } + + const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst); + if (pending_event_count == 0) { + // cause all the threads to stop + switch (builtin.os) { + builtin.Os.linux => { + // writing 8 bytes to an eventfd cannot fail + std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + return; + }, + builtin.Os.macosx => { + const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); + const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + // cannot fail because we already added it and this just enables it + _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable; + return; + }, + builtin.Os.windows => { + var i: usize = 0; + while (i < self.os_data.extra_thread_count) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; + } + } + return; + }, + else => @compileError("unsupported OS"), + } + } + + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + } + + switch (builtin.os) { + builtin.Os.linux => { + // only process 1 event so we don't steal from other threads + var events: [1]std.os.linux.epoll_event = undefined; + const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); + for (events[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + event_fd_node.epoll_op = posix.EPOLL_CTL_MOD; + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + } + }, + builtin.Os.macosx => { + var eventlist: [1]posix.Kevent = undefined; + const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable; + for (eventlist[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.udata); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + } + }, + builtin.Os.windows => { + var completion_key: usize = undefined; + while (true) { + var nbytes: windows.DWORD = undefined; + var overlapped: ?*windows.OVERLAPPED = undefined; + switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { + std.os.WindowsWaitResult.Aborted => return, + std.os.WindowsWaitResult.Normal => {}, + } + if (overlapped != null) break; + } + const resume_node = @intToPtr(*ResumeNode, completion_key); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + }, + else => @compileError("unsupported OS"), + } + } + } + + const OsData = switch (builtin.os) { + builtin.Os.linux => struct { + epollfd: i32, + final_eventfd: i32, + final_eventfd_event: std.os.linux.epoll_event, + }, + builtin.Os.macosx => MacOsData, + builtin.Os.windows => struct { + io_port: windows.HANDLE, + extra_thread_count: usize, + }, + else => struct {}, + }; + + const MacOsData = struct { + kqfd: i32, + final_kevent: posix.Kevent, + kevents: []posix.Kevent, + }; +}; + +test "std.event.Loop - basic" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + loop.run(); +} + +test "std.event.Loop - call" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var did_it = false; + const handle = try loop.call(testEventLoop); + const handle2 = try loop.call(testEventLoop2, handle, &did_it); + defer cancel handle2; + + loop.run(); + + assert(did_it); +} + +async fn testEventLoop() i32 { + return 1234; +} + +async fn testEventLoop2(h: promise->i32, did_it: *bool) void { + const value = await h; + assert(value == 1234); + did_it.* = true; +} |
