aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2020-10-14 21:49:45 -0400
committerGitHub <noreply@github.com>2020-10-14 21:49:45 -0400
commit3b4432d9a6ae7f01f66e5ddcc15bf579d2c47460 (patch)
treed24b87f33bf868ad6b4857bb500a80441744cc4a /lib
parent2f52f95b928c1dcb42afb22243c27fb9661cbd1b (diff)
parent12508025a4b3a841819b913e4ed88810ca04ba79 (diff)
downloadzig-3b4432d9a6ae7f01f66e5ddcc15bf579d2c47460.tar.gz
zig-3b4432d9a6ae7f01f66e5ddcc15bf579d2c47460.zip
Merge pull request #6655 from kprotty/timers
Integrate std.time.sleep with the event loop
Diffstat (limited to 'lib')
-rw-r--r--lib/std/auto_reset_event.zig229
-rw-r--r--lib/std/event/loop.zig157
-rw-r--r--lib/std/std.zig1
-rw-r--r--lib/std/time.zig5
4 files changed, 391 insertions, 1 deletions
diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig
new file mode 100644
index 0000000000..df528e40a5
--- /dev/null
+++ b/lib/std/auto_reset_event.zig
@@ -0,0 +1,229 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2020 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+const std = @import("std.zig");
+const builtin = @import("builtin");
+const testing = std.testing;
+const assert = std.debug.assert;
+
+/// Similar to std.ResetEvent but on `set()` it also (atomically) does `reset()`.
+/// Unlike std.ResetEvent, `wait()` can only be called by one thread (MPSC-like).
+pub const AutoResetEvent = struct {
+ // AutoResetEvent has 3 possible states:
+ // - UNSET: the AutoResetEvent is currently unset
+ // - SET: the AutoResetEvent was notified before a wait() was called
+ // - <std.ResetEvent pointer>: there is an active waiter waiting for a notification.
+ //
+ // When attempting to wait:
+ // if the event is unset, it registers a ResetEvent pointer to be notified when the event is set
+ // if the event is already set, then it consumes the notification and resets the event.
+ //
+ // When attempting to notify:
+ // if the event is unset, then we set the event
+ // if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent
+ //
+ // This ensures that the event is automatically reset after a wait() has been issued
+ // and avoids the race condition when using std.ResetEvent in the following scenario:
+ // thread 1 | thread 2
+ // std.ResetEvent.wait() |
+ // | std.ResetEvent.set()
+ // | std.ResetEvent.set()
+ // std.ResetEvent.reset() |
+ // std.ResetEvent.wait() | (missed the second .set() notification above)
+
+
+ state: usize = UNSET,
+
+ const UNSET = 0;
+ const SET = 1;
+
+ // the minimum alignment for the `*std.ResetEvent` created by wait*()
+ const event_align = std.math.max(@alignOf(std.ResetEvent), 2);
+
+ pub fn wait(self: *AutoResetEvent) void {
+ self.waitFor(null) catch unreachable;
+ }
+
+ pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void {
+ return self.waitFor(timeout);
+ }
+
+ fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void {
+ // lazily initialized std.ResetEvent
+ var reset_event: std.ResetEvent align(event_align) = undefined;
+ var has_reset_event = false;
+ defer if (has_reset_event) {
+ reset_event.deinit();
+ };
+
+ var state = @atomicLoad(usize, &self.state, .SeqCst);
+ while (true) {
+ // consume a notification if there is any
+ if (state == SET) {
+ @atomicStore(usize, &self.state, UNSET, .SeqCst);
+ return;
+ }
+
+ // check if theres currently a pending ResetEvent pointer already registered
+ if (state != UNSET) {
+ unreachable; // multiple waiting threads on the same AutoResetEvent
+ }
+
+ // lazily initialize the ResetEvent if it hasn't been already
+ if (!has_reset_event) {
+ has_reset_event = true;
+ reset_event = std.ResetEvent.init();
+ }
+
+ // Since the AutoResetEvent currently isnt set,
+ // try to register our ResetEvent on it to wait
+ // for a set() call from another thread.
+ if (@cmpxchgWeak(
+ usize,
+ &self.state,
+ UNSET,
+ @ptrToInt(&reset_event),
+ .SeqCst,
+ .SeqCst,
+ )) |new_state| {
+ state = new_state;
+ continue;
+ }
+
+ // if no timeout was specified, then just wait forever
+ const timeout_ns = timeout orelse {
+ reset_event.wait();
+ return;
+ };
+
+ // wait with a timeout and return if signalled via set()
+ if (reset_event.timedWait(timeout_ns)) |_| {
+ return;
+ } else |timed_out| {}
+
+ // If we timed out, we need to transition the AutoResetEvent back to UNSET.
+ // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent.
+ state = @cmpxchgStrong(
+ usize,
+ &self.state,
+ @ptrToInt(&reset_event),
+ UNSET,
+ .SeqCst,
+ .SeqCst,
+ ) orelse return error.TimedOut;
+
+ // We didn't manage to unregister ourselves from the state.
+ if (state == SET) {
+ unreachable; // AutoResetEvent notified without waking up the waiting thread
+ } else if (state != UNSET) {
+ unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out
+ }
+
+ // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up.
+ // We need to wait for it to wake up our ResetEvent before we can return and invalidate it.
+ // We don't return error.TimedOut here as it technically notified us while we were "timing out".
+ reset_event.wait();
+ return;
+ }
+ }
+
+ pub fn set(self: *AutoResetEvent) void {
+ var state = @atomicLoad(usize, &self.state, .SeqCst);
+ while (true) {
+ // If the AutoResetEvent is already set, there is nothing else left to do
+ if (state == SET) {
+ return;
+ }
+
+ // If the AutoResetEvent isn't set,
+ // then try to leave a notification for the wait() thread that we set() it.
+ if (state == UNSET) {
+ state = @cmpxchgWeak(
+ usize,
+ &self.state,
+ UNSET,
+ SET,
+ .SeqCst,
+ .SeqCst,
+ ) orelse return;
+ continue;
+ }
+
+ // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting.
+ // Try to acquire ownership of it so that we can wake it up.
+ // This also resets the AutoResetEvent so that there is no race condition as defined above.
+ if (@cmpxchgWeak(
+ usize,
+ &self.state,
+ state,
+ UNSET,
+ .SeqCst,
+ .SeqCst,
+ )) |new_state| {
+ state = new_state;
+ continue;
+ }
+
+ const reset_event = @intToPtr(*align(event_align) std.ResetEvent, state);
+ reset_event.set();
+ return;
+ }
+ }
+};
+
+test "std.AutoResetEvent" {
+ // test local code paths
+ {
+ var event = AutoResetEvent{};
+ testing.expectError(error.TimedOut, event.timedWait(1));
+ event.set();
+ event.wait();
+ }
+
+ // test cross-thread signaling
+ if (builtin.single_threaded)
+ return;
+
+ const Context = struct {
+ value: u128 = 0,
+ in: AutoResetEvent = AutoResetEvent{},
+ out: AutoResetEvent = AutoResetEvent{},
+
+ const Self = @This();
+
+ fn sender(self: *Self) void {
+ testing.expect(self.value == 0);
+ self.value = 1;
+ self.out.set();
+
+ self.in.wait();
+ testing.expect(self.value == 2);
+ self.value = 3;
+ self.out.set();
+
+ self.in.wait();
+ testing.expect(self.value == 4);
+ }
+
+ fn receiver(self: *Self) void {
+ self.out.wait();
+ testing.expect(self.value == 1);
+ self.value = 2;
+ self.in.set();
+
+ self.out.wait();
+ testing.expect(self.value == 3);
+ self.value = 4;
+ self.in.set();
+ }
+ };
+
+ var context = Context{};
+ const send_thread = try std.Thread.spawn(&context, Context.sender);
+ const recv_thread = try std.Thread.spawn(&context, Context.receiver);
+
+ send_thread.wait();
+ recv_thread.wait();
+} \ No newline at end of file
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
index 5b10335535..89c3e5a8b8 100644
--- a/lib/std/event/loop.zig
+++ b/lib/std/event/loop.zig
@@ -35,6 +35,9 @@ pub const Loop = struct {
/// This is only used by `Loop` for the thread pool and associated resources.
arena: std.heap.ArenaAllocator,
+ /// State which manages frames that are sleeping on timers
+ delay_queue: DelayQueue,
+
/// Pre-allocated eventfds. All permanently active.
/// This is how `Loop` sends promises to be resumed on other threads.
available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
@@ -162,6 +165,7 @@ pub const Loop = struct {
.fs_queue = std.atomic.Queue(Request).init(),
.fs_thread = undefined,
.fs_thread_wakeup = std.ResetEvent.init(),
+ .delay_queue = undefined,
};
errdefer self.fs_thread_wakeup.deinit();
errdefer self.arena.deinit();
@@ -186,6 +190,9 @@ pub const Loop = struct {
self.posixFsRequest(&self.fs_end_request);
self.fs_thread.wait();
};
+
+ if (!std.builtin.single_threaded)
+ try self.delay_queue.init();
}
pub fn deinit(self: *Loop) void {
@@ -645,6 +652,10 @@ pub const Loop = struct {
for (self.extra_threads) |extra_thread| {
extra_thread.wait();
}
+
+ @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst);
+ self.delay_queue.event.set();
+ self.delay_queue.thread.wait();
}
/// Runs the provided function asynchronously. The function's frame is allocated
@@ -748,6 +759,128 @@ pub const Loop = struct {
}
}
+ pub fn sleep(self: *Loop, nanoseconds: u64) void {
+ if (std.builtin.single_threaded)
+ @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded");
+
+ suspend {
+ const now = self.delay_queue.timer.read();
+
+ var entry: DelayQueue.Waiters.Entry = undefined;
+ entry.init(@frame(), now + nanoseconds);
+ self.delay_queue.waiters.insert(&entry);
+
+ // Speculatively wake up the timer thread when we add a new entry.
+ // If the timer thread is sleeping on a longer entry, we need to
+ // interrupt it so that our entry can be expired in time.
+ self.delay_queue.event.set();
+ }
+ }
+
+ const DelayQueue = struct {
+ timer: std.time.Timer,
+ waiters: Waiters,
+ thread: *std.Thread,
+ event: std.AutoResetEvent,
+ is_running: bool,
+
+ /// Initialize the delay queue by spawning the timer thread
+ /// and starting any timer resources.
+ fn init(self: *DelayQueue) !void {
+ self.* = DelayQueue{
+ .timer = try std.time.Timer.start(),
+ .waiters = DelayQueue.Waiters{
+ .entries = std.atomic.Queue(anyframe).init(),
+ },
+ .thread = try std.Thread.spawn(self, DelayQueue.run),
+ .event = std.AutoResetEvent{},
+ .is_running = true,
+ };
+ }
+
+ /// Entry point for the timer thread
+ /// which waits for timer entries to expire and reschedules them.
+ fn run(self: *DelayQueue) void {
+ const loop = @fieldParentPtr(Loop, "delay_queue", self);
+
+ while (@atomicLoad(bool, &self.is_running, .SeqCst)) {
+ const now = self.timer.read();
+
+ if (self.waiters.popExpired(now)) |entry| {
+ loop.onNextTick(&entry.node);
+ continue;
+ }
+
+ if (self.waiters.nextExpire()) |expires| {
+ if (now >= expires)
+ continue;
+ self.event.timedWait(expires - now) catch {};
+ } else {
+ self.event.wait();
+ }
+ }
+ }
+
+ // TODO: use a tickless heirarchical timer wheel:
+ // https://github.com/wahern/timeout/
+ const Waiters = struct {
+ entries: std.atomic.Queue(anyframe),
+
+ const Entry = struct {
+ node: NextTickNode,
+ expires: u64,
+
+ fn init(self: *Entry, frame: anyframe, expires: u64) void {
+ self.node.data = frame;
+ self.expires = expires;
+ }
+ };
+
+ /// Registers the entry into the queue of waiting frames
+ fn insert(self: *Waiters, entry: *Entry) void {
+ self.entries.put(&entry.node);
+ }
+
+ /// Dequeues one expired event relative to `now`
+ fn popExpired(self: *Waiters, now: u64) ?*Entry {
+ const entry = self.peekExpiringEntry() orelse return null;
+ if (entry.expires > now)
+ return null;
+
+ assert(self.entries.remove(&entry.node));
+ return entry;
+ }
+
+ /// Returns an estimate for the amount of time
+ /// to wait until the next waiting entry expires.
+ fn nextExpire(self: *Waiters) ?u64 {
+ const entry = self.peekExpiringEntry() orelse return null;
+ return entry.expires;
+ }
+
+ fn peekExpiringEntry(self: *Waiters) ?*Entry {
+ const held = self.entries.mutex.acquire();
+ defer held.release();
+
+ // starting from the head
+ var head = self.entries.head orelse return null;
+
+ // traverse the list of waiting entires to
+ // find the Node with the smallest `expires` field
+ var min = head;
+ while (head.next) |node| {
+ const minEntry = @fieldParentPtr(Entry, "node", min);
+ const nodeEntry = @fieldParentPtr(Entry, "node", node);
+ if (nodeEntry.expires < minEntry.expires)
+ min = node;
+ head = node;
+ }
+
+ return @fieldParentPtr(Entry, "node", min);
+ }
+ };
+ };
+
/// ------- I/0 APIs -------
pub fn accept(
self: *Loop,
@@ -1550,3 +1683,27 @@ test "std.event.Loop - runDetached" {
fn testRunDetached() void {
testRunDetachedData += 1;
}
+
+test "std.event.Loop - sleep" {
+ // https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+ if (!std.io.is_async) return error.SkipZigTest;
+
+ const frames = try testing.allocator.alloc(@Frame(testSleep), 10);
+ defer testing.allocator.free(frames);
+
+ const wait_time = 100 * std.time.ns_per_ms;
+ var sleep_count: usize = 0;
+
+ for (frames) |*frame|
+ frame.* = async testSleep(wait_time, &sleep_count);
+ for (frames) |*frame|
+ await frame;
+
+ testing.expect(sleep_count == frames.len);
+}
+
+fn testSleep(wait_ns: u64, sleep_count: *usize) void {
+ Loop.instance.?.sleep(wait_ns);
+ _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst);
+}
diff --git a/lib/std/std.zig b/lib/std/std.zig
index 62f7f21f4e..f4cca0f9f4 100644
--- a/lib/std/std.zig
+++ b/lib/std/std.zig
@@ -14,6 +14,7 @@ pub const AutoArrayHashMap = array_hash_map.AutoArrayHashMap;
pub const AutoArrayHashMapUnmanaged = array_hash_map.AutoArrayHashMapUnmanaged;
pub const AutoHashMap = hash_map.AutoHashMap;
pub const AutoHashMapUnmanaged = hash_map.AutoHashMapUnmanaged;
+pub const AutoResetEvent = @import("auto_reset_event.zig").AutoResetEvent;
pub const BufMap = @import("buf_map.zig").BufMap;
pub const BufSet = @import("buf_set.zig").BufSet;
pub const ChildProcess = @import("child_process.zig").ChildProcess;
diff --git a/lib/std/time.zig b/lib/std/time.zig
index 344f192784..e537071e1b 100644
--- a/lib/std/time.zig
+++ b/lib/std/time.zig
@@ -14,8 +14,11 @@ const is_windows = std.Target.current.os.tag == .windows;
pub const epoch = @import("time/epoch.zig");
/// Spurious wakeups are possible and no precision of timing is guaranteed.
-/// TODO integrate with evented I/O
pub fn sleep(nanoseconds: u64) void {
+ // TODO: opting out of async sleeping?
+ if (std.io.is_async)
+ return std.event.Loop.instance.?.sleep(nanoseconds);
+
if (is_windows) {
const big_ms_from_ns = nanoseconds / ns_per_ms;
const ms = math.cast(os.windows.DWORD, big_ms_from_ns) catch math.maxInt(os.windows.DWORD);