aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event/loop.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2021-09-15 14:46:31 -0700
committerAndrew Kelley <andrew@ziglang.org>2021-09-15 14:51:08 -0700
commitf3ebfcae3882c03da84821abed40167ea07a8c78 (patch)
treef1b759c94cba5b020a9ffb141fc62cb686fc5f04 /lib/std/event/loop.zig
parent111a2dcf3ad53c0c8ad2c9e7c9bd042b81e90c82 (diff)
parent0395b35cee8d4082cc40b0dcd0298f797f42309d (diff)
downloadzig-f3ebfcae3882c03da84821abed40167ea07a8c78.tar.gz
zig-f3ebfcae3882c03da84821abed40167ea07a8c78.zip
Merge remote-tracking branch 'origin/master' into llvm13
Conflicts: * cmake/Findclang.cmake * cmake/Findlld.cmake * cmake/Findllvm.cmake In master branch, more search paths were added to these files with "12" in the path. In this commit I updated them to "13". * src/stage1/codegen.cpp * src/zig_llvm.cpp * src/zig_llvm.h In master branch, ZigLLVMBuildCmpXchg is improved to add `is_single_threaded`. However, the LLVM 13 C API has this already, and in the llvm13 branch, ZigLLVMBuildCmpXchg is deleted in favor of the C API. In this commit I updated stage2 to use the LLVM 13 C API rather than depending on an improved ZigLLVMBuildCmpXchg. Additionally, src/target.zig largestAtomicBits needed to be updated to include the new m68k ISA.
Diffstat (limited to 'lib/std/event/loop.zig')
-rw-r--r--lib/std/event/loop.zig145
1 files changed, 104 insertions, 41 deletions
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
index 50fb76ec9d..e7fe2da33d 100644
--- a/lib/std/event/loop.zig
+++ b/lib/std/event/loop.zig
@@ -222,27 +222,27 @@ pub const Loop = struct {
.handle = undefined,
.overlapped = ResumeNode.overlapped_init,
},
- .eventfd = try os.eventfd(1, os.EFD_CLOEXEC | os.EFD_NONBLOCK),
- .epoll_op = os.EPOLL_CTL_ADD,
+ .eventfd = try os.eventfd(1, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK),
+ .epoll_op = os.linux.EPOLL.CTL_ADD,
},
.next = undefined,
};
self.available_eventfd_resume_nodes.push(eventfd_node);
}
- self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC);
+ self.os_data.epollfd = try os.epoll_create1(os.linux.EPOLL.CLOEXEC);
errdefer os.close(self.os_data.epollfd);
- self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK);
+ self.os_data.final_eventfd = try os.eventfd(0, os.linux.EFD.CLOEXEC | os.linux.EFD.NONBLOCK);
errdefer os.close(self.os_data.final_eventfd);
- self.os_data.final_eventfd_event = os.epoll_event{
- .events = os.EPOLLIN,
- .data = os.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
+ self.os_data.final_eventfd_event = os.linux.epoll_event{
+ .events = os.linux.EPOLL.IN,
+ .data = os.linux.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
};
try os.epoll_ctl(
self.os_data.epollfd,
- os.EPOLL_CTL_ADD,
+ os.linux.EPOLL.CTL_ADD,
self.os_data.final_eventfd,
&self.os_data.final_eventfd_event,
);
@@ -266,7 +266,72 @@ pub const Loop = struct {
self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
}
},
- .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
+ .macos, .freebsd, .netbsd, .dragonfly => {
+ self.os_data.kqfd = try os.kqueue();
+ errdefer os.close(self.os_data.kqfd);
+
+ const empty_kevs = &[0]os.Kevent{};
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ .overlapped = ResumeNode.overlapped_init,
+ },
+ // this one is for sending events
+ .kevent = os.Kevent{
+ .ident = i,
+ .filter = os.system.EVFILT_USER,
+ .flags = os.system.EV_CLEAR | os.system.EV_ADD | os.system.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&eventfd_node.data.base),
+ },
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent);
+ _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
+ eventfd_node.data.kevent.flags = os.system.EV_CLEAR | os.system.EV_ENABLE;
+ eventfd_node.data.kevent.fflags = os.system.NOTE_TRIGGER;
+ }
+
+ // Pre-add so that we cannot get error.SystemResources
+ // later when we try to activate it.
+ self.os_data.final_kevent = os.Kevent{
+ .ident = extra_thread_count,
+ .filter = os.system.EVFILT_USER,
+ .flags = os.system.EV_ADD | os.system.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&self.final_resume_node),
+ };
+ const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
+ _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
+ self.os_data.final_kevent.flags = os.system.EV_ENABLE;
+ self.os_data.final_kevent.fflags = os.system.NOTE_TRIGGER;
+
+ if (builtin.single_threaded) {
+ assert(extra_thread_count == 0);
+ return;
+ }
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].join();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
+ }
+ },
+ .openbsd => {
self.os_data.kqfd = try os.kqueue();
errdefer os.close(self.os_data.kqfd);
@@ -283,8 +348,8 @@ pub const Loop = struct {
// this one is for sending events
.kevent = os.Kevent{
.ident = i,
- .filter = os.EVFILT_USER,
- .flags = os.EV_CLEAR | os.EV_ADD | os.EV_DISABLE,
+ .filter = os.system.EVFILT_TIMER,
+ .flags = os.system.EV_CLEAR | os.system.EV_ADD | os.system.EV_DISABLE | os.system.EV_ONESHOT,
.fflags = 0,
.data = 0,
.udata = @ptrToInt(&eventfd_node.data.base),
@@ -295,24 +360,22 @@ pub const Loop = struct {
self.available_eventfd_resume_nodes.push(eventfd_node);
const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent);
_ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
- eventfd_node.data.kevent.flags = os.EV_CLEAR | os.EV_ENABLE;
- eventfd_node.data.kevent.fflags = os.NOTE_TRIGGER;
+ eventfd_node.data.kevent.flags = os.system.EV_CLEAR | os.system.EV_ENABLE;
}
// Pre-add so that we cannot get error.SystemResources
// later when we try to activate it.
self.os_data.final_kevent = os.Kevent{
.ident = extra_thread_count,
- .filter = os.EVFILT_USER,
- .flags = os.EV_ADD | os.EV_DISABLE,
+ .filter = os.system.EVFILT_TIMER,
+ .flags = os.system.EV_ADD | os.system.EV_ONESHOT | os.system.EV_DISABLE,
.fflags = 0,
.data = 0,
.udata = @ptrToInt(&self.final_resume_node),
};
const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
_ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
- self.os_data.final_kevent.flags = os.EV_ENABLE;
- self.os_data.final_kevent.fflags = os.NOTE_TRIGGER;
+ self.os_data.final_kevent.flags = os.system.EV_ENABLE;
if (builtin.single_threaded) {
assert(extra_thread_count == 0);
@@ -404,19 +467,19 @@ pub const Loop = struct {
/// resume_node must live longer than the anyframe that it holds a reference to.
/// flags must contain EPOLLET
pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void {
- assert(flags & os.EPOLLET == os.EPOLLET);
+ assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
self.beginOneEvent();
errdefer self.finishOneEvent();
try self.linuxModFd(
fd,
- os.EPOLL_CTL_ADD,
+ os.linux.EPOLL.CTL_ADD,
flags,
resume_node,
);
}
pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void {
- assert(flags & os.EPOLLET == os.EPOLLET);
+ assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
var ev = os.linux.epoll_event{
.events = flags,
.data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
@@ -425,13 +488,13 @@ pub const Loop = struct {
}
pub fn linuxRemoveFd(self: *Loop, fd: i32) void {
- os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, null) catch {};
+ os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL.CTL_DEL, fd, null) catch {};
self.finishOneEvent();
}
pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) void {
- assert(flags & os.EPOLLET == os.EPOLLET);
- assert(flags & os.EPOLLONESHOT == os.EPOLLONESHOT);
+ assert(flags & os.linux.EPOLL.ET == os.linux.EPOLL.ET);
+ assert(flags & os.linux.EPOLL.ONESHOT == os.linux.EPOLL.ONESHOT);
var resume_node = ResumeNode.Basic{
.base = ResumeNode{
.id = .Basic,
@@ -457,8 +520,8 @@ pub const Loop = struct {
// Fall back to a blocking poll(). Ideally this codepath is never hit, since
// epoll should be just fine. But this is better than incorrect behavior.
var poll_flags: i16 = 0;
- if ((flags & os.EPOLLIN) != 0) poll_flags |= os.POLLIN;
- if ((flags & os.EPOLLOUT) != 0) poll_flags |= os.POLLOUT;
+ if ((flags & os.linux.EPOLL.IN) != 0) poll_flags |= os.POLL.IN;
+ if ((flags & os.linux.EPOLL.OUT) != 0) poll_flags |= os.POLL.OUT;
var pfd = [1]os.pollfd{os.pollfd{
.fd = fd,
.events = poll_flags,
@@ -484,10 +547,10 @@ pub const Loop = struct {
pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) void {
switch (builtin.os.tag) {
.linux => {
- self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLIN);
+ self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.IN);
},
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_READ, os.EV_ONESHOT);
+ self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_READ, os.system.EV_ONESHOT);
},
else => @compileError("Unsupported OS"),
}
@@ -496,10 +559,10 @@ pub const Loop = struct {
pub fn waitUntilFdWritable(self: *Loop, fd: os.fd_t) void {
switch (builtin.os.tag) {
.linux => {
- self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT);
+ self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.OUT);
},
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_WRITE, os.EV_ONESHOT);
+ self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_WRITE, os.system.EV_ONESHOT);
},
else => @compileError("Unsupported OS"),
}
@@ -508,11 +571,11 @@ pub const Loop = struct {
pub fn waitUntilFdWritableOrReadable(self: *Loop, fd: os.fd_t) void {
switch (builtin.os.tag) {
.linux => {
- self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLONESHOT | os.EPOLLOUT | os.EPOLLIN);
+ self.linuxWaitFd(fd, os.linux.EPOLL.ET | os.linux.EPOLL.ONESHOT | os.linux.EPOLL.OUT | os.linux.EPOLL.IN);
},
.macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
- self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_READ, os.EV_ONESHOT);
- self.bsdWaitKev(@intCast(usize, fd), os.EVFILT_WRITE, os.EV_ONESHOT);
+ self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_READ, os.system.EV_ONESHOT);
+ self.bsdWaitKev(@intCast(usize, fd), os.system.EVFILT_WRITE, os.system.EV_ONESHOT);
},
else => @compileError("Unsupported OS"),
}
@@ -530,7 +593,7 @@ pub const Loop = struct {
defer {
// If the kevent was set to be ONESHOT, it doesn't need to be deleted manually.
- if (flags & os.EV_ONESHOT != 0) {
+ if (flags & os.system.EV_ONESHOT != 0) {
self.bsdRemoveKev(ident, filter);
}
}
@@ -547,7 +610,7 @@ pub const Loop = struct {
var kev = [1]os.Kevent{os.Kevent{
.ident = ident,
.filter = filter,
- .flags = os.EV_ADD | os.EV_ENABLE | os.EV_CLEAR | flags,
+ .flags = os.system.EV_ADD | os.system.EV_ENABLE | os.system.EV_CLEAR | flags,
.fflags = 0,
.data = 0,
.udata = @ptrToInt(&resume_node.base),
@@ -560,7 +623,7 @@ pub const Loop = struct {
var kev = [1]os.Kevent{os.Kevent{
.ident = ident,
.filter = filter,
- .flags = os.EV_DELETE,
+ .flags = os.system.EV_DELETE,
.fflags = 0,
.data = 0,
.udata = 0,
@@ -590,8 +653,8 @@ pub const Loop = struct {
},
.linux => {
// the pending count is already accounted for
- const epoll_events = os.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT |
- os.linux.EPOLLET;
+ const epoll_events = os.linux.EPOLL.ONESHOT | os.linux.EPOLL.IN | os.linux.EPOLL.OUT |
+ os.linux.EPOLL.ET;
self.linuxModFd(
eventfd_node.eventfd,
eventfd_node.epoll_op,
@@ -903,12 +966,12 @@ pub const Loop = struct {
/// will return a value greater than was supplied to the call.
addr_size: *os.socklen_t,
/// The following values can be bitwise ORed in flags to obtain different behavior:
- /// * `SOCK_CLOEXEC` - Set the close-on-exec (`FD_CLOEXEC`) flag on the new file descriptor. See the
- /// description of the `O_CLOEXEC` flag in `open` for reasons why this may be useful.
+ /// * `SOCK.CLOEXEC` - Set the close-on-exec (`FD_CLOEXEC`) flag on the new file descriptor. See the
+ /// description of the `O.CLOEXEC` flag in `open` for reasons why this may be useful.
flags: u32,
) os.AcceptError!os.socket_t {
while (true) {
- return os.accept(sockfd, addr, addr_size, flags | os.SOCK_NONBLOCK) catch |err| switch (err) {
+ return os.accept(sockfd, addr, addr_size, flags | os.SOCK.NONBLOCK) catch |err| switch (err) {
error.WouldBlock => {
self.waitUntilFdReadable(sockfd);
continue;
@@ -1344,7 +1407,7 @@ pub const Loop = struct {
.Stop => return,
.EventFd => {
const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
- event_fd_node.epoll_op = os.EPOLL_CTL_MOD;
+ event_fd_node.epoll_op = os.linux.EPOLL.CTL_MOD;
const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
self.available_eventfd_resume_nodes.push(stack_node);
},