diff options
| author | Sebastien Marie <semarie@users.noreply.github.com> | 2020-10-17 17:38:23 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-17 17:38:23 +0200 |
| commit | 35a7247a2c9ab208d96fdc9a7c0c63f4c852666c (patch) | |
| tree | d90d927b2c153ef2212dfcb6a0b47346e8896eb5 /lib/std/event/loop.zig | |
| parent | 161eb4a000923c28d152781dcc8a080905c7ad32 (diff) | |
| parent | 245d98d32dd29e80de9732f415a4731748008acf (diff) | |
| download | zig-35a7247a2c9ab208d96fdc9a7c0c63f4c852666c.tar.gz zig-35a7247a2c9ab208d96fdc9a7c0c63f4c852666c.zip | |
Merge branch 'master' into openbsd-minimal
Diffstat (limited to 'lib/std/event/loop.zig')
| -rw-r--r-- | lib/std/event/loop.zig | 181 |
1 files changed, 169 insertions, 12 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 7fa6fee398..80dc94d184 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -35,6 +35,9 @@ pub const Loop = struct { /// This is only used by `Loop` for the thread pool and associated resources. arena: std.heap.ArenaAllocator, + /// State which manages frames that are sleeping on timers + delay_queue: DelayQueue, + /// Pre-allocated eventfds. All permanently active. /// This is how `Loop` sends promises to be resumed on other threads. available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), @@ -66,7 +69,7 @@ pub const Loop = struct { }; pub const EventFd = switch (builtin.os.tag) { - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => KEventFd, + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventFd, .linux => struct { base: ResumeNode, epoll_op: u32, @@ -85,7 +88,7 @@ pub const Loop = struct { }; pub const Basic = switch (builtin.os.tag) { - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => KEventBasic, + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventBasic, .linux => struct { base: ResumeNode, }, @@ -162,6 +165,7 @@ pub const Loop = struct { .fs_queue = std.atomic.Queue(Request).init(), .fs_thread = undefined, .fs_thread_wakeup = std.ResetEvent.init(), + .delay_queue = undefined, }; errdefer self.fs_thread_wakeup.deinit(); errdefer self.arena.deinit(); @@ -186,6 +190,9 @@ pub const Loop = struct { self.posixFsRequest(&self.fs_end_request); self.fs_thread.wait(); }; + + if (!std.builtin.single_threaded) + try self.delay_queue.init(); } pub fn deinit(self: *Loop) void { @@ -259,7 +266,7 @@ pub const Loop = struct { self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); } }, - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => { + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { self.os_data.kqfd = try os.kqueue(); errdefer os.close(self.os_data.kqfd); @@ -384,7 +391,7 @@ pub const Loop = struct { while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); os.close(self.os_data.epollfd); }, - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => { + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { os.close(self.os_data.kqfd); }, .windows => { @@ -478,7 +485,7 @@ pub const Loop = struct { .linux => { self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLIN); }, - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => { + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_READ, os.EV_ONESHOT); }, else => @compileError("Unsupported OS"), @@ -490,7 +497,7 @@ pub const Loop = struct { .linux => { self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT); }, - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => { + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_WRITE, os.EV_ONESHOT); }, else => @compileError("Unsupported OS"), @@ -502,7 +509,7 @@ pub const Loop = struct { .linux => { self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT | os.EPOLLIN); }, - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => { + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_READ, os.EV_ONESHOT); self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_WRITE, os.EV_ONESHOT); }, @@ -571,7 +578,7 @@ pub const Loop = struct { const eventfd_node = &resume_stack_node.data; eventfd_node.base.handle = next_tick_node.data; switch (builtin.os.tag) { - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => { + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.kevent); const empty_kevs = &[0]os.Kevent{}; _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { @@ -633,7 +640,7 @@ pub const Loop = struct { if (!builtin.single_threaded) { switch (builtin.os.tag) { .linux, - .macosx, + .macos, .freebsd, .netbsd, .dragonfly, @@ -646,6 +653,10 @@ pub const Loop = struct { for (self.extra_threads) |extra_thread| { extra_thread.wait(); } + + @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst); + self.delay_queue.event.set(); + self.delay_queue.thread.wait(); } /// Runs the provided function asynchronously. The function's frame is allocated @@ -726,7 +737,7 @@ pub const Loop = struct { } return; }, - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => { + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { const final_kevent = @as(*const [1]os.Kevent, &self.os_data.final_kevent); const empty_kevs = &[0]os.Kevent{}; // cannot fail because we already added it and this just enables it @@ -749,6 +760,128 @@ pub const Loop = struct { } } + pub fn sleep(self: *Loop, nanoseconds: u64) void { + if (std.builtin.single_threaded) + @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded"); + + suspend { + const now = self.delay_queue.timer.read(); + + var entry: DelayQueue.Waiters.Entry = undefined; + entry.init(@frame(), now + nanoseconds); + self.delay_queue.waiters.insert(&entry); + + // Speculatively wake up the timer thread when we add a new entry. + // If the timer thread is sleeping on a longer entry, we need to + // interrupt it so that our entry can be expired in time. + self.delay_queue.event.set(); + } + } + + const DelayQueue = struct { + timer: std.time.Timer, + waiters: Waiters, + thread: *std.Thread, + event: std.AutoResetEvent, + is_running: bool, + + /// Initialize the delay queue by spawning the timer thread + /// and starting any timer resources. + fn init(self: *DelayQueue) !void { + self.* = DelayQueue{ + .timer = try std.time.Timer.start(), + .waiters = DelayQueue.Waiters{ + .entries = std.atomic.Queue(anyframe).init(), + }, + .thread = try std.Thread.spawn(self, DelayQueue.run), + .event = std.AutoResetEvent{}, + .is_running = true, + }; + } + + /// Entry point for the timer thread + /// which waits for timer entries to expire and reschedules them. + fn run(self: *DelayQueue) void { + const loop = @fieldParentPtr(Loop, "delay_queue", self); + + while (@atomicLoad(bool, &self.is_running, .SeqCst)) { + const now = self.timer.read(); + + if (self.waiters.popExpired(now)) |entry| { + loop.onNextTick(&entry.node); + continue; + } + + if (self.waiters.nextExpire()) |expires| { + if (now >= expires) + continue; + self.event.timedWait(expires - now) catch {}; + } else { + self.event.wait(); + } + } + } + + // TODO: use a tickless heirarchical timer wheel: + // https://github.com/wahern/timeout/ + const Waiters = struct { + entries: std.atomic.Queue(anyframe), + + const Entry = struct { + node: NextTickNode, + expires: u64, + + fn init(self: *Entry, frame: anyframe, expires: u64) void { + self.node.data = frame; + self.expires = expires; + } + }; + + /// Registers the entry into the queue of waiting frames + fn insert(self: *Waiters, entry: *Entry) void { + self.entries.put(&entry.node); + } + + /// Dequeues one expired event relative to `now` + fn popExpired(self: *Waiters, now: u64) ?*Entry { + const entry = self.peekExpiringEntry() orelse return null; + if (entry.expires > now) + return null; + + assert(self.entries.remove(&entry.node)); + return entry; + } + + /// Returns an estimate for the amount of time + /// to wait until the next waiting entry expires. + fn nextExpire(self: *Waiters) ?u64 { + const entry = self.peekExpiringEntry() orelse return null; + return entry.expires; + } + + fn peekExpiringEntry(self: *Waiters) ?*Entry { + const held = self.entries.mutex.acquire(); + defer held.release(); + + // starting from the head + var head = self.entries.head orelse return null; + + // traverse the list of waiting entires to + // find the Node with the smallest `expires` field + var min = head; + while (head.next) |node| { + const minEntry = @fieldParentPtr(Entry, "node", min); + const nodeEntry = @fieldParentPtr(Entry, "node", node); + if (nodeEntry.expires < minEntry.expires) + min = node; + head = node; + } + + return @fieldParentPtr(Entry, "node", min); + } + }; + }; + /// ------- I/0 APIs ------- pub fn accept( self: *Loop, @@ -1219,7 +1352,7 @@ pub const Loop = struct { } } }, - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => { + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => { var eventlist: [1]os.Kevent = undefined; const empty_kevs = &[0]os.Kevent{}; const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable; @@ -1345,7 +1478,7 @@ pub const Loop = struct { const OsData = switch (builtin.os.tag) { .linux => LinuxOsData, - .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => KEventData, + .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventData, .windows => struct { io_port: windows.HANDLE, extra_thread_count: usize, @@ -1551,3 +1684,27 @@ test "std.event.Loop - runDetached" { fn testRunDetached() void { testRunDetachedData += 1; } + +test "std.event.Loop - sleep" { + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + if (!std.io.is_async) return error.SkipZigTest; + + const frames = try testing.allocator.alloc(@Frame(testSleep), 10); + defer testing.allocator.free(frames); + + const wait_time = 100 * std.time.ns_per_ms; + var sleep_count: usize = 0; + + for (frames) |*frame| + frame.* = async testSleep(wait_time, &sleep_count); + for (frames) |*frame| + await frame; + + testing.expect(sleep_count == frames.len); +} + +fn testSleep(wait_ns: u64, sleep_count: *usize) void { + Loop.instance.?.sleep(wait_ns); + _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst); +} |
