aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event/loop.zig
diff options
context:
space:
mode:
authorSebastien Marie <semarie@users.noreply.github.com>2020-10-17 17:38:23 +0200
committerGitHub <noreply@github.com>2020-10-17 17:38:23 +0200
commit35a7247a2c9ab208d96fdc9a7c0c63f4c852666c (patch)
treed90d927b2c153ef2212dfcb6a0b47346e8896eb5 /lib/std/event/loop.zig
parent161eb4a000923c28d152781dcc8a080905c7ad32 (diff)
parent245d98d32dd29e80de9732f415a4731748008acf (diff)
downloadzig-35a7247a2c9ab208d96fdc9a7c0c63f4c852666c.tar.gz
zig-35a7247a2c9ab208d96fdc9a7c0c63f4c852666c.zip
Merge branch 'master' into openbsd-minimal
Diffstat (limited to 'lib/std/event/loop.zig')
-rw-r--r--lib/std/event/loop.zig181
1 files changed, 169 insertions, 12 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
index 7fa6fee398..80dc94d184 100644
--- a/lib/std/event/loop.zig
+++ b/lib/std/event/loop.zig
@@ -35,6 +35,9 @@ pub const Loop = struct {
/// This is only used by `Loop` for the thread pool and associated resources.
arena: std.heap.ArenaAllocator,
+ /// State which manages frames that are sleeping on timers
+ delay_queue: DelayQueue,
+
/// Pre-allocated eventfds. All permanently active.
/// This is how `Loop` sends promises to be resumed on other threads.
available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
@@ -66,7 +69,7 @@ pub const Loop = struct {
};
pub const EventFd = switch (builtin.os.tag) {
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => KEventFd,
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventFd,
.linux => struct {
base: ResumeNode,
epoll_op: u32,
@@ -85,7 +88,7 @@ pub const Loop = struct {
};
pub const Basic = switch (builtin.os.tag) {
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => KEventBasic,
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventBasic,
.linux => struct {
base: ResumeNode,
},
@@ -162,6 +165,7 @@ pub const Loop = struct {
.fs_queue = std.atomic.Queue(Request).init(),
.fs_thread = undefined,
.fs_thread_wakeup = std.ResetEvent.init(),
+ .delay_queue = undefined,
};
errdefer self.fs_thread_wakeup.deinit();
errdefer self.arena.deinit();
@@ -186,6 +190,9 @@ pub const Loop = struct {
self.posixFsRequest(&self.fs_end_request);
self.fs_thread.wait();
};
+
+ if (!std.builtin.single_threaded)
+ try self.delay_queue.init();
}
pub fn deinit(self: *Loop) void {
@@ -259,7 +266,7 @@ pub const Loop = struct {
self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun);
}
},
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
self.os_data.kqfd = try os.kqueue();
errdefer os.close(self.os_data.kqfd);
@@ -384,7 +391,7 @@ pub const Loop = struct {
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
os.close(self.os_data.epollfd);
},
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
os.close(self.os_data.kqfd);
},
.windows => {
@@ -478,7 +485,7 @@ pub const Loop = struct {
.linux => {
self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLIN);
},
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_READ, os.EV_ONESHOT);
},
else => @compileError("Unsupported OS"),
@@ -490,7 +497,7 @@ pub const Loop = struct {
.linux => {
self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT);
},
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_WRITE, os.EV_ONESHOT);
},
else => @compileError("Unsupported OS"),
@@ -502,7 +509,7 @@ pub const Loop = struct {
.linux => {
self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT | os.EPOLLIN);
},
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_READ, os.EV_ONESHOT);
self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_WRITE, os.EV_ONESHOT);
},
@@ -571,7 +578,7 @@ pub const Loop = struct {
const eventfd_node = &resume_stack_node.data;
eventfd_node.base.handle = next_tick_node.data;
switch (builtin.os.tag) {
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.kevent);
const empty_kevs = &[0]os.Kevent{};
_ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch {
@@ -633,7 +640,7 @@ pub const Loop = struct {
if (!builtin.single_threaded) {
switch (builtin.os.tag) {
.linux,
- .macosx,
+ .macos,
.freebsd,
.netbsd,
.dragonfly,
@@ -646,6 +653,10 @@ pub const Loop = struct {
for (self.extra_threads) |extra_thread| {
extra_thread.wait();
}
+
+ @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst);
+ self.delay_queue.event.set();
+ self.delay_queue.thread.wait();
}
/// Runs the provided function asynchronously. The function's frame is allocated
@@ -726,7 +737,7 @@ pub const Loop = struct {
}
return;
},
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
const final_kevent = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
const empty_kevs = &[0]os.Kevent{};
// cannot fail because we already added it and this just enables it
@@ -749,6 +760,128 @@ pub const Loop = struct {
}
}
+ pub fn sleep(self: *Loop, nanoseconds: u64) void {
+ if (std.builtin.single_threaded)
+ @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded");
+
+ suspend {
+ const now = self.delay_queue.timer.read();
+
+ var entry: DelayQueue.Waiters.Entry = undefined;
+ entry.init(@frame(), now + nanoseconds);
+ self.delay_queue.waiters.insert(&entry);
+
+ // Speculatively wake up the timer thread when we add a new entry.
+ // If the timer thread is sleeping on a longer entry, we need to
+ // interrupt it so that our entry can be expired in time.
+ self.delay_queue.event.set();
+ }
+ }
+
+ const DelayQueue = struct {
+ timer: std.time.Timer,
+ waiters: Waiters,
+ thread: *std.Thread,
+ event: std.AutoResetEvent,
+ is_running: bool,
+
+ /// Initialize the delay queue by spawning the timer thread
+ /// and starting any timer resources.
+ fn init(self: *DelayQueue) !void {
+ self.* = DelayQueue{
+ .timer = try std.time.Timer.start(),
+ .waiters = DelayQueue.Waiters{
+ .entries = std.atomic.Queue(anyframe).init(),
+ },
+ .thread = try std.Thread.spawn(self, DelayQueue.run),
+ .event = std.AutoResetEvent{},
+ .is_running = true,
+ };
+ }
+
+ /// Entry point for the timer thread
+ /// which waits for timer entries to expire and reschedules them.
+ fn run(self: *DelayQueue) void {
+ const loop = @fieldParentPtr(Loop, "delay_queue", self);
+
+ while (@atomicLoad(bool, &self.is_running, .SeqCst)) {
+ const now = self.timer.read();
+
+ if (self.waiters.popExpired(now)) |entry| {
+ loop.onNextTick(&entry.node);
+ continue;
+ }
+
+ if (self.waiters.nextExpire()) |expires| {
+ if (now >= expires)
+ continue;
+ self.event.timedWait(expires - now) catch {};
+ } else {
+ self.event.wait();
+ }
+ }
+ }
+
+ // TODO: use a tickless heirarchical timer wheel:
+ // https://github.com/wahern/timeout/
+ const Waiters = struct {
+ entries: std.atomic.Queue(anyframe),
+
+ const Entry = struct {
+ node: NextTickNode,
+ expires: u64,
+
+ fn init(self: *Entry, frame: anyframe, expires: u64) void {
+ self.node.data = frame;
+ self.expires = expires;
+ }
+ };
+
+ /// Registers the entry into the queue of waiting frames
+ fn insert(self: *Waiters, entry: *Entry) void {
+ self.entries.put(&entry.node);
+ }
+
+ /// Dequeues one expired event relative to `now`
+ fn popExpired(self: *Waiters, now: u64) ?*Entry {
+ const entry = self.peekExpiringEntry() orelse return null;
+ if (entry.expires > now)
+ return null;
+
+ assert(self.entries.remove(&entry.node));
+ return entry;
+ }
+
+ /// Returns an estimate for the amount of time
+ /// to wait until the next waiting entry expires.
+ fn nextExpire(self: *Waiters) ?u64 {
+ const entry = self.peekExpiringEntry() orelse return null;
+ return entry.expires;
+ }
+
+ fn peekExpiringEntry(self: *Waiters) ?*Entry {
+ const held = self.entries.mutex.acquire();
+ defer held.release();
+
+ // starting from the head
+ var head = self.entries.head orelse return null;
+
+ // traverse the list of waiting entires to
+ // find the Node with the smallest `expires` field
+ var min = head;
+ while (head.next) |node| {
+ const minEntry = @fieldParentPtr(Entry, "node", min);
+ const nodeEntry = @fieldParentPtr(Entry, "node", node);
+ if (nodeEntry.expires < minEntry.expires)
+ min = node;
+ head = node;
+ }
+
+ return @fieldParentPtr(Entry, "node", min);
+ }
+ };
+ };
+
/// ------- I/0 APIs -------
pub fn accept(
self: *Loop,
@@ -1219,7 +1352,7 @@ pub const Loop = struct {
}
}
},
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
var eventlist: [1]os.Kevent = undefined;
const empty_kevs = &[0]os.Kevent{};
const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable;
@@ -1345,7 +1478,7 @@ pub const Loop = struct {
const OsData = switch (builtin.os.tag) {
.linux => LinuxOsData,
- .macosx, .freebsd, .netbsd, .dragonfly, .openbsd => KEventData,
+ .macos, .freebsd, .netbsd, .dragonfly, .openbsd => KEventData,
.windows => struct {
io_port: windows.HANDLE,
extra_thread_count: usize,
@@ -1551,3 +1684,27 @@ test "std.event.Loop - runDetached" {
fn testRunDetached() void {
testRunDetachedData += 1;
}
+
+test "std.event.Loop - sleep" {
+ // https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+ if (!std.io.is_async) return error.SkipZigTest;
+
+ const frames = try testing.allocator.alloc(@Frame(testSleep), 10);
+ defer testing.allocator.free(frames);
+
+ const wait_time = 100 * std.time.ns_per_ms;
+ var sleep_count: usize = 0;
+
+ for (frames) |*frame|
+ frame.* = async testSleep(wait_time, &sleep_count);
+ for (frames) |*frame|
+ await frame;
+
+ testing.expect(sleep_count == frames.len);
+}
+
+fn testSleep(wait_ns: u64, sleep_count: *usize) void {
+ Loop.instance.?.sleep(wait_ns);
+ _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst);
+}