diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2020-10-14 21:49:45 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-14 21:49:45 -0400 |
| commit | 3b4432d9a6ae7f01f66e5ddcc15bf579d2c47460 (patch) | |
| tree | d24b87f33bf868ad6b4857bb500a80441744cc4a /lib | |
| parent | 2f52f95b928c1dcb42afb22243c27fb9661cbd1b (diff) | |
| parent | 12508025a4b3a841819b913e4ed88810ca04ba79 (diff) | |
| download | zig-3b4432d9a6ae7f01f66e5ddcc15bf579d2c47460.tar.gz zig-3b4432d9a6ae7f01f66e5ddcc15bf579d2c47460.zip | |
Merge pull request #6655 from kprotty/timers
Integrate std.time.sleep with the event loop
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/std/auto_reset_event.zig | 229 | ||||
| -rw-r--r-- | lib/std/event/loop.zig | 157 | ||||
| -rw-r--r-- | lib/std/std.zig | 1 | ||||
| -rw-r--r-- | lib/std/time.zig | 5 |
4 files changed, 391 insertions, 1 deletions
diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig new file mode 100644 index 0000000000..df528e40a5 --- /dev/null +++ b/lib/std/auto_reset_event.zig @@ -0,0 +1,229 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2020 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. +const std = @import("std.zig"); +const builtin = @import("builtin"); +const testing = std.testing; +const assert = std.debug.assert; + +/// Similar to std.ResetEvent but on `set()` it also (atomically) does `reset()`. +/// Unlike std.ResetEvent, `wait()` can only be called by one thread (MPSC-like). +pub const AutoResetEvent = struct { + // AutoResetEvent has 3 possible states: + // - UNSET: the AutoResetEvent is currently unset + // - SET: the AutoResetEvent was notified before a wait() was called + // - <std.ResetEvent pointer>: there is an active waiter waiting for a notification. + // + // When attempting to wait: + // if the event is unset, it registers a ResetEvent pointer to be notified when the event is set + // if the event is already set, then it consumes the notification and resets the event. + // + // When attempting to notify: + // if the event is unset, then we set the event + // if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent + // + // This ensures that the event is automatically reset after a wait() has been issued + // and avoids the race condition when using std.ResetEvent in the following scenario: + // thread 1 | thread 2 + // std.ResetEvent.wait() | + // | std.ResetEvent.set() + // | std.ResetEvent.set() + // std.ResetEvent.reset() | + // std.ResetEvent.wait() | (missed the second .set() notification above) + + + state: usize = UNSET, + + const UNSET = 0; + const SET = 1; + + // the minimum alignment for the `*std.ResetEvent` created by wait*() + const event_align = std.math.max(@alignOf(std.ResetEvent), 2); + + pub fn wait(self: *AutoResetEvent) void { + self.waitFor(null) catch unreachable; + } + + pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void { + return self.waitFor(timeout); + } + + fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { + // lazily initialized std.ResetEvent + var reset_event: std.ResetEvent align(event_align) = undefined; + var has_reset_event = false; + defer if (has_reset_event) { + reset_event.deinit(); + }; + + var state = @atomicLoad(usize, &self.state, .SeqCst); + while (true) { + // consume a notification if there is any + if (state == SET) { + @atomicStore(usize, &self.state, UNSET, .SeqCst); + return; + } + + // check if theres currently a pending ResetEvent pointer already registered + if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent + } + + // lazily initialize the ResetEvent if it hasn't been already + if (!has_reset_event) { + has_reset_event = true; + reset_event = std.ResetEvent.init(); + } + + // Since the AutoResetEvent currently isnt set, + // try to register our ResetEvent on it to wait + // for a set() call from another thread. + if (@cmpxchgWeak( + usize, + &self.state, + UNSET, + @ptrToInt(&reset_event), + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + // if no timeout was specified, then just wait forever + const timeout_ns = timeout orelse { + reset_event.wait(); + return; + }; + + // wait with a timeout and return if signalled via set() + if (reset_event.timedWait(timeout_ns)) |_| { + return; + } else |timed_out| {} + + // If we timed out, we need to transition the AutoResetEvent back to UNSET. + // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. + state = @cmpxchgStrong( + usize, + &self.state, + @ptrToInt(&reset_event), + UNSET, + .SeqCst, + .SeqCst, + ) orelse return error.TimedOut; + + // We didn't manage to unregister ourselves from the state. + if (state == SET) { + unreachable; // AutoResetEvent notified without waking up the waiting thread + } else if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out + } + + // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up. + // We need to wait for it to wake up our ResetEvent before we can return and invalidate it. + // We don't return error.TimedOut here as it technically notified us while we were "timing out". + reset_event.wait(); + return; + } + } + + pub fn set(self: *AutoResetEvent) void { + var state = @atomicLoad(usize, &self.state, .SeqCst); + while (true) { + // If the AutoResetEvent is already set, there is nothing else left to do + if (state == SET) { + return; + } + + // If the AutoResetEvent isn't set, + // then try to leave a notification for the wait() thread that we set() it. + if (state == UNSET) { + state = @cmpxchgWeak( + usize, + &self.state, + UNSET, + SET, + .SeqCst, + .SeqCst, + ) orelse return; + continue; + } + + // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting. + // Try to acquire ownership of it so that we can wake it up. + // This also resets the AutoResetEvent so that there is no race condition as defined above. + if (@cmpxchgWeak( + usize, + &self.state, + state, + UNSET, + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + const reset_event = @intToPtr(*align(event_align) std.ResetEvent, state); + reset_event.set(); + return; + } + } +}; + +test "std.AutoResetEvent" { + // test local code paths + { + var event = AutoResetEvent{}; + testing.expectError(error.TimedOut, event.timedWait(1)); + event.set(); + event.wait(); + } + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + value: u128 = 0, + in: AutoResetEvent = AutoResetEvent{}, + out: AutoResetEvent = AutoResetEvent{}, + + const Self = @This(); + + fn sender(self: *Self) void { + testing.expect(self.value == 0); + self.value = 1; + self.out.set(); + + self.in.wait(); + testing.expect(self.value == 2); + self.value = 3; + self.out.set(); + + self.in.wait(); + testing.expect(self.value == 4); + } + + fn receiver(self: *Self) void { + self.out.wait(); + testing.expect(self.value == 1); + self.value = 2; + self.in.set(); + + self.out.wait(); + testing.expect(self.value == 3); + self.value = 4; + self.in.set(); + } + }; + + var context = Context{}; + const send_thread = try std.Thread.spawn(&context, Context.sender); + const recv_thread = try std.Thread.spawn(&context, Context.receiver); + + send_thread.wait(); + recv_thread.wait(); +}
\ No newline at end of file diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 5b10335535..89c3e5a8b8 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -35,6 +35,9 @@ pub const Loop = struct { /// This is only used by `Loop` for the thread pool and associated resources. arena: std.heap.ArenaAllocator, + /// State which manages frames that are sleeping on timers + delay_queue: DelayQueue, + /// Pre-allocated eventfds. All permanently active. /// This is how `Loop` sends promises to be resumed on other threads. available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), @@ -162,6 +165,7 @@ pub const Loop = struct { .fs_queue = std.atomic.Queue(Request).init(), .fs_thread = undefined, .fs_thread_wakeup = std.ResetEvent.init(), + .delay_queue = undefined, }; errdefer self.fs_thread_wakeup.deinit(); errdefer self.arena.deinit(); @@ -186,6 +190,9 @@ pub const Loop = struct { self.posixFsRequest(&self.fs_end_request); self.fs_thread.wait(); }; + + if (!std.builtin.single_threaded) + try self.delay_queue.init(); } pub fn deinit(self: *Loop) void { @@ -645,6 +652,10 @@ pub const Loop = struct { for (self.extra_threads) |extra_thread| { extra_thread.wait(); } + + @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst); + self.delay_queue.event.set(); + self.delay_queue.thread.wait(); } /// Runs the provided function asynchronously. The function's frame is allocated @@ -748,6 +759,128 @@ pub const Loop = struct { } } + pub fn sleep(self: *Loop, nanoseconds: u64) void { + if (std.builtin.single_threaded) + @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded"); + + suspend { + const now = self.delay_queue.timer.read(); + + var entry: DelayQueue.Waiters.Entry = undefined; + entry.init(@frame(), now + nanoseconds); + self.delay_queue.waiters.insert(&entry); + + // Speculatively wake up the timer thread when we add a new entry. + // If the timer thread is sleeping on a longer entry, we need to + // interrupt it so that our entry can be expired in time. + self.delay_queue.event.set(); + } + } + + const DelayQueue = struct { + timer: std.time.Timer, + waiters: Waiters, + thread: *std.Thread, + event: std.AutoResetEvent, + is_running: bool, + + /// Initialize the delay queue by spawning the timer thread + /// and starting any timer resources. + fn init(self: *DelayQueue) !void { + self.* = DelayQueue{ + .timer = try std.time.Timer.start(), + .waiters = DelayQueue.Waiters{ + .entries = std.atomic.Queue(anyframe).init(), + }, + .thread = try std.Thread.spawn(self, DelayQueue.run), + .event = std.AutoResetEvent{}, + .is_running = true, + }; + } + + /// Entry point for the timer thread + /// which waits for timer entries to expire and reschedules them. + fn run(self: *DelayQueue) void { + const loop = @fieldParentPtr(Loop, "delay_queue", self); + + while (@atomicLoad(bool, &self.is_running, .SeqCst)) { + const now = self.timer.read(); + + if (self.waiters.popExpired(now)) |entry| { + loop.onNextTick(&entry.node); + continue; + } + + if (self.waiters.nextExpire()) |expires| { + if (now >= expires) + continue; + self.event.timedWait(expires - now) catch {}; + } else { + self.event.wait(); + } + } + } + + // TODO: use a tickless heirarchical timer wheel: + // https://github.com/wahern/timeout/ + const Waiters = struct { + entries: std.atomic.Queue(anyframe), + + const Entry = struct { + node: NextTickNode, + expires: u64, + + fn init(self: *Entry, frame: anyframe, expires: u64) void { + self.node.data = frame; + self.expires = expires; + } + }; + + /// Registers the entry into the queue of waiting frames + fn insert(self: *Waiters, entry: *Entry) void { + self.entries.put(&entry.node); + } + + /// Dequeues one expired event relative to `now` + fn popExpired(self: *Waiters, now: u64) ?*Entry { + const entry = self.peekExpiringEntry() orelse return null; + if (entry.expires > now) + return null; + + assert(self.entries.remove(&entry.node)); + return entry; + } + + /// Returns an estimate for the amount of time + /// to wait until the next waiting entry expires. + fn nextExpire(self: *Waiters) ?u64 { + const entry = self.peekExpiringEntry() orelse return null; + return entry.expires; + } + + fn peekExpiringEntry(self: *Waiters) ?*Entry { + const held = self.entries.mutex.acquire(); + defer held.release(); + + // starting from the head + var head = self.entries.head orelse return null; + + // traverse the list of waiting entires to + // find the Node with the smallest `expires` field + var min = head; + while (head.next) |node| { + const minEntry = @fieldParentPtr(Entry, "node", min); + const nodeEntry = @fieldParentPtr(Entry, "node", node); + if (nodeEntry.expires < minEntry.expires) + min = node; + head = node; + } + + return @fieldParentPtr(Entry, "node", min); + } + }; + }; + /// ------- I/0 APIs ------- pub fn accept( self: *Loop, @@ -1550,3 +1683,27 @@ test "std.event.Loop - runDetached" { fn testRunDetached() void { testRunDetachedData += 1; } + +test "std.event.Loop - sleep" { + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + if (!std.io.is_async) return error.SkipZigTest; + + const frames = try testing.allocator.alloc(@Frame(testSleep), 10); + defer testing.allocator.free(frames); + + const wait_time = 100 * std.time.ns_per_ms; + var sleep_count: usize = 0; + + for (frames) |*frame| + frame.* = async testSleep(wait_time, &sleep_count); + for (frames) |*frame| + await frame; + + testing.expect(sleep_count == frames.len); +} + +fn testSleep(wait_ns: u64, sleep_count: *usize) void { + Loop.instance.?.sleep(wait_ns); + _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst); +} diff --git a/lib/std/std.zig b/lib/std/std.zig index 62f7f21f4e..f4cca0f9f4 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -14,6 +14,7 @@ pub const AutoArrayHashMap = array_hash_map.AutoArrayHashMap; pub const AutoArrayHashMapUnmanaged = array_hash_map.AutoArrayHashMapUnmanaged; pub const AutoHashMap = hash_map.AutoHashMap; pub const AutoHashMapUnmanaged = hash_map.AutoHashMapUnmanaged; +pub const AutoResetEvent = @import("auto_reset_event.zig").AutoResetEvent; pub const BufMap = @import("buf_map.zig").BufMap; pub const BufSet = @import("buf_set.zig").BufSet; pub const ChildProcess = @import("child_process.zig").ChildProcess; diff --git a/lib/std/time.zig b/lib/std/time.zig index 344f192784..e537071e1b 100644 --- a/lib/std/time.zig +++ b/lib/std/time.zig @@ -14,8 +14,11 @@ const is_windows = std.Target.current.os.tag == .windows; pub const epoch = @import("time/epoch.zig"); /// Spurious wakeups are possible and no precision of timing is guaranteed. -/// TODO integrate with evented I/O pub fn sleep(nanoseconds: u64) void { + // TODO: opting out of async sleeping? + if (std.io.is_async) + return std.event.Loop.instance.?.sleep(nanoseconds); + if (is_windows) { const big_ms_from_ns = nanoseconds / ns_per_ms; const ms = math.cast(os.windows.DWORD, big_ms_from_ns) catch math.maxInt(os.windows.DWORD); |
