diff options
Diffstat (limited to 'lib/std/event/loop.zig')
| -rw-r--r-- | lib/std/event/loop.zig | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 5b10335535..89c3e5a8b8 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -35,6 +35,9 @@ pub const Loop = struct { /// This is only used by `Loop` for the thread pool and associated resources. arena: std.heap.ArenaAllocator, + /// State which manages frames that are sleeping on timers + delay_queue: DelayQueue, + /// Pre-allocated eventfds. All permanently active. /// This is how `Loop` sends promises to be resumed on other threads. available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), @@ -162,6 +165,7 @@ pub const Loop = struct { .fs_queue = std.atomic.Queue(Request).init(), .fs_thread = undefined, .fs_thread_wakeup = std.ResetEvent.init(), + .delay_queue = undefined, }; errdefer self.fs_thread_wakeup.deinit(); errdefer self.arena.deinit(); @@ -186,6 +190,9 @@ pub const Loop = struct { self.posixFsRequest(&self.fs_end_request); self.fs_thread.wait(); }; + + if (!std.builtin.single_threaded) + try self.delay_queue.init(); } pub fn deinit(self: *Loop) void { @@ -645,6 +652,10 @@ pub const Loop = struct { for (self.extra_threads) |extra_thread| { extra_thread.wait(); } + + @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst); + self.delay_queue.event.set(); + self.delay_queue.thread.wait(); } /// Runs the provided function asynchronously. The function's frame is allocated @@ -748,6 +759,128 @@ pub const Loop = struct { } } + pub fn sleep(self: *Loop, nanoseconds: u64) void { + if (std.builtin.single_threaded) + @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded"); + + suspend { + const now = self.delay_queue.timer.read(); + + var entry: DelayQueue.Waiters.Entry = undefined; + entry.init(@frame(), now + nanoseconds); + self.delay_queue.waiters.insert(&entry); + + // Speculatively wake up the timer thread when we add a new entry. + // If the timer thread is sleeping on a longer entry, we need to + // interrupt it so that our entry can be expired in time. + self.delay_queue.event.set(); + } + } + + const DelayQueue = struct { + timer: std.time.Timer, + waiters: Waiters, + thread: *std.Thread, + event: std.AutoResetEvent, + is_running: bool, + + /// Initialize the delay queue by spawning the timer thread + /// and starting any timer resources. + fn init(self: *DelayQueue) !void { + self.* = DelayQueue{ + .timer = try std.time.Timer.start(), + .waiters = DelayQueue.Waiters{ + .entries = std.atomic.Queue(anyframe).init(), + }, + .thread = try std.Thread.spawn(self, DelayQueue.run), + .event = std.AutoResetEvent{}, + .is_running = true, + }; + } + + /// Entry point for the timer thread + /// which waits for timer entries to expire and reschedules them. + fn run(self: *DelayQueue) void { + const loop = @fieldParentPtr(Loop, "delay_queue", self); + + while (@atomicLoad(bool, &self.is_running, .SeqCst)) { + const now = self.timer.read(); + + if (self.waiters.popExpired(now)) |entry| { + loop.onNextTick(&entry.node); + continue; + } + + if (self.waiters.nextExpire()) |expires| { + if (now >= expires) + continue; + self.event.timedWait(expires - now) catch {}; + } else { + self.event.wait(); + } + } + } + + // TODO: use a tickless heirarchical timer wheel: + // https://github.com/wahern/timeout/ + const Waiters = struct { + entries: std.atomic.Queue(anyframe), + + const Entry = struct { + node: NextTickNode, + expires: u64, + + fn init(self: *Entry, frame: anyframe, expires: u64) void { + self.node.data = frame; + self.expires = expires; + } + }; + + /// Registers the entry into the queue of waiting frames + fn insert(self: *Waiters, entry: *Entry) void { + self.entries.put(&entry.node); + } + + /// Dequeues one expired event relative to `now` + fn popExpired(self: *Waiters, now: u64) ?*Entry { + const entry = self.peekExpiringEntry() orelse return null; + if (entry.expires > now) + return null; + + assert(self.entries.remove(&entry.node)); + return entry; + } + + /// Returns an estimate for the amount of time + /// to wait until the next waiting entry expires. + fn nextExpire(self: *Waiters) ?u64 { + const entry = self.peekExpiringEntry() orelse return null; + return entry.expires; + } + + fn peekExpiringEntry(self: *Waiters) ?*Entry { + const held = self.entries.mutex.acquire(); + defer held.release(); + + // starting from the head + var head = self.entries.head orelse return null; + + // traverse the list of waiting entires to + // find the Node with the smallest `expires` field + var min = head; + while (head.next) |node| { + const minEntry = @fieldParentPtr(Entry, "node", min); + const nodeEntry = @fieldParentPtr(Entry, "node", node); + if (nodeEntry.expires < minEntry.expires) + min = node; + head = node; + } + + return @fieldParentPtr(Entry, "node", min); + } + }; + }; + /// ------- I/0 APIs ------- pub fn accept( self: *Loop, @@ -1550,3 +1683,27 @@ test "std.event.Loop - runDetached" { fn testRunDetached() void { testRunDetachedData += 1; } + +test "std.event.Loop - sleep" { + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + if (!std.io.is_async) return error.SkipZigTest; + + const frames = try testing.allocator.alloc(@Frame(testSleep), 10); + defer testing.allocator.free(frames); + + const wait_time = 100 * std.time.ns_per_ms; + var sleep_count: usize = 0; + + for (frames) |*frame| + frame.* = async testSleep(wait_time, &sleep_count); + for (frames) |*frame| + await frame; + + testing.expect(sleep_count == frames.len); +} + +fn testSleep(wait_ns: u64, sleep_count: *usize) void { + Loop.instance.?.sleep(wait_ns); + _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst); +} |
