diff options
| author | hryx <codroid@gmail.com> | 2019-05-27 17:24:21 -0700 |
|---|---|---|
| committer | hryx <codroid@gmail.com> | 2019-05-27 17:24:21 -0700 |
| commit | e1f3eec9cc05535b3f3b81f2fb7cd65dd4d1e841 (patch) | |
| tree | 5f408ed68a686491eaf759f9cbba02beac829b38 /std/event/loop.zig | |
| parent | 2aa1c5da5dded6b1b346c3a1b57443f2c459ebe9 (diff) | |
| parent | 3fccc0747903f0726d6cc8ee73832cb62f1304bb (diff) | |
| download | zig-e1f3eec9cc05535b3f3b81f2fb7cd65dd4d1e841.tar.gz zig-e1f3eec9cc05535b3f3b81f2fb7cd65dd4d1e841.zip | |
Merge branch 'master' into translate-c-userland
Diffstat (limited to 'std/event/loop.zig')
| -rw-r--r-- | std/event/loop.zig | 268 |
1 files changed, 134 insertions, 134 deletions
diff --git a/std/event/loop.zig b/std/event/loop.zig index 76b1f6455b..61732d78f5 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -7,9 +7,9 @@ const AtomicRmwOp = builtin.AtomicRmwOp; const AtomicOrder = builtin.AtomicOrder; const fs = std.event.fs; const os = std.os; -const posix = os.posix; const windows = os.windows; const maxInt = std.math.maxInt; +const Thread = std.Thread; pub const Loop = struct { allocator: *mem.Allocator, @@ -17,7 +17,7 @@ pub const Loop = struct { os_data: OsData, final_resume_node: ResumeNode, pending_event_count: usize, - extra_threads: []*os.Thread, + extra_threads: []*Thread, // pre-allocated eventfds. all permanently active. // this is how we send promises to be resumed on other threads. @@ -32,7 +32,7 @@ pub const Loop = struct { overlapped: Overlapped, pub const overlapped_init = switch (builtin.os) { - builtin.Os.windows => windows.OVERLAPPED{ + .windows => windows.OVERLAPPED{ .Internal = 0, .InternalHigh = 0, .Offset = 0, @@ -50,13 +50,13 @@ pub const Loop = struct { }; pub const EventFd = switch (builtin.os) { - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventFd, - builtin.Os.linux => struct { + .macosx, .freebsd, .netbsd => KEventFd, + .linux => struct { base: ResumeNode, epoll_op: u32, eventfd: i32, }, - builtin.Os.windows => struct { + .windows => struct { base: ResumeNode, completion_key: usize, }, @@ -65,15 +65,15 @@ pub const Loop = struct { const KEventFd = struct { base: ResumeNode, - kevent: posix.Kevent, + kevent: os.Kevent, }; pub const Basic = switch (builtin.os) { - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventBasic, - builtin.Os.linux => struct { + .macosx, .freebsd, .netbsd => KEventBasic, + .linux => struct { base: ResumeNode, }, - builtin.Os.windows => struct { + .windows => struct { base: ResumeNode, }, else => @compileError("unsupported OS"), @@ -81,7 +81,7 @@ pub const Loop = struct { const KEventBasic = struct { base: ResumeNode, - kev: posix.Kevent, + kev: os.Kevent, }; }; @@ -99,7 +99,7 @@ pub const Loop = struct { /// have the correct pointer value. pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode"); - const core_count = try os.cpuCount(allocator); + const core_count = try Thread.cpuCount(); return self.initInternal(allocator, core_count); } @@ -127,7 +127,7 @@ pub const Loop = struct { ); errdefer self.allocator.free(self.eventfd_resume_nodes); - self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count); + self.extra_threads = try self.allocator.alloc(*Thread, extra_thread_count); errdefer self.allocator.free(self.extra_threads); try self.initOsData(extra_thread_count); @@ -139,15 +139,15 @@ pub const Loop = struct { self.allocator.free(self.extra_threads); } - const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError || - os.SpawnThreadError || os.LinuxEpollCtlError || os.BsdKEventError || - os.WindowsCreateIoCompletionPortError; + const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError || + Thread.SpawnError || os.EpollCtlError || os.KEventError || + windows.CreateIoCompletionPortError; const wakeup_bytes = []u8{0x1} ** 8; fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { switch (builtin.os) { - builtin.Os.linux => { + .linux => { self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); self.os_data.fs_queue_item = 0; // we need another thread for the file system because Linux does not have an async @@ -172,32 +172,32 @@ pub const Loop = struct { .handle = undefined, .overlapped = ResumeNode.overlapped_init, }, - .eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), - .epoll_op = posix.EPOLL_CTL_ADD, + .eventfd = try os.eventfd(1, os.EFD_CLOEXEC | os.EFD_NONBLOCK), + .epoll_op = os.EPOLL_CTL_ADD, }, .next = undefined, }; self.available_eventfd_resume_nodes.push(eventfd_node); } - self.os_data.epollfd = try os.linuxEpollCreate(posix.EPOLL_CLOEXEC); + self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC); errdefer os.close(self.os_data.epollfd); - self.os_data.final_eventfd = try os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); + self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK); errdefer os.close(self.os_data.final_eventfd); - self.os_data.final_eventfd_event = posix.epoll_event{ - .events = posix.EPOLLIN, - .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, + self.os_data.final_eventfd_event = os.epoll_event{ + .events = os.EPOLLIN, + .data = os.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, }; - try os.linuxEpollCtl( + try os.epoll_ctl( self.os_data.epollfd, - posix.EPOLL_CTL_ADD, + os.EPOLL_CTL_ADD, self.os_data.final_eventfd, &self.os_data.final_eventfd_event, ); - self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); errdefer { self.posixFsRequest(&self.os_data.fs_end_request); self.os_data.fs_thread.wait(); @@ -211,21 +211,21 @@ pub const Loop = struct { var extra_thread_index: usize = 0; errdefer { // writing 8 bytes to an eventfd cannot fail - os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); } }, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - self.os_data.kqfd = try os.bsdKQueue(); + .macosx, .freebsd, .netbsd => { + self.os_data.kqfd = try os.kqueue(); errdefer os.close(self.os_data.kqfd); - self.os_data.fs_kqfd = try os.bsdKQueue(); + self.os_data.fs_kqfd = try os.kqueue(); errdefer os.close(self.os_data.fs_kqfd); self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); @@ -240,7 +240,7 @@ pub const Loop = struct { }, }; - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; for (self.eventfd_resume_nodes) |*eventfd_node, i| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -251,10 +251,10 @@ pub const Loop = struct { .overlapped = ResumeNode.overlapped_init, }, // this one is for sending events - .kevent = posix.Kevent{ + .kevent = os.Kevent{ .ident = i, - .filter = posix.EVFILT_USER, - .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE, + .filter = os.EVFILT_USER, + .flags = os.EV_CLEAR | os.EV_ADD | os.EV_DISABLE, .fflags = 0, .data = 0, .udata = @ptrToInt(&eventfd_node.data.base), @@ -263,46 +263,46 @@ pub const Loop = struct { .next = undefined, }; self.available_eventfd_resume_nodes.push(eventfd_node); - const kevent_array = (*const [1]posix.Kevent)(&eventfd_node.data.kevent); - _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); - eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; - eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; + const kevent_array = (*const [1]os.Kevent)(&eventfd_node.data.kevent); + _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null); + eventfd_node.data.kevent.flags = os.EV_CLEAR | os.EV_ENABLE; + eventfd_node.data.kevent.fflags = os.NOTE_TRIGGER; } // Pre-add so that we cannot get error.SystemResources // later when we try to activate it. - self.os_data.final_kevent = posix.Kevent{ + self.os_data.final_kevent = os.Kevent{ .ident = extra_thread_count, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD | posix.EV_DISABLE, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_DISABLE, .fflags = 0, .data = 0, .udata = @ptrToInt(&self.final_resume_node), }; - const final_kev_arr = (*const [1]posix.Kevent)(&self.os_data.final_kevent); - _ = try os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); - self.os_data.final_kevent.flags = posix.EV_ENABLE; - self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; + const final_kev_arr = (*const [1]os.Kevent)(&self.os_data.final_kevent); + _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); + self.os_data.final_kevent.flags = os.EV_ENABLE; + self.os_data.final_kevent.fflags = os.NOTE_TRIGGER; - self.os_data.fs_kevent_wake = posix.Kevent{ + self.os_data.fs_kevent_wake = os.Kevent{ .ident = 0, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD | posix.EV_ENABLE, - .fflags = posix.NOTE_TRIGGER, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_ENABLE, + .fflags = os.NOTE_TRIGGER, .data = 0, .udata = undefined, }; - self.os_data.fs_kevent_wait = posix.Kevent{ + self.os_data.fs_kevent_wait = os.Kevent{ .ident = 0, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD | posix.EV_CLEAR, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_CLEAR, .fflags = 0, .data = 0, .udata = undefined, }; - self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); errdefer { self.posixFsRequest(&self.os_data.fs_end_request); self.os_data.fs_thread.wait(); @@ -315,24 +315,24 @@ pub const Loop = struct { var extra_thread_index: usize = 0; errdefer { - _ = os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; + _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); } }, - builtin.Os.windows => { - self.os_data.io_port = try os.windowsCreateIoCompletionPort( + .windows => { + self.os_data.io_port = try windows.CreateIoCompletionPort( windows.INVALID_HANDLE_VALUE, null, undefined, maxInt(windows.DWORD), ); - errdefer os.close(self.os_data.io_port); + errdefer windows.CloseHandle(self.os_data.io_port); for (self.eventfd_resume_nodes) |*eventfd_node, i| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -361,7 +361,7 @@ pub const Loop = struct { while (i < extra_thread_index) : (i += 1) { while (true) { const overlapped = &self.final_resume_node.overlapped; - os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; + windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; break; } } @@ -371,7 +371,7 @@ pub const Loop = struct { } } while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); } }, else => {}, @@ -380,18 +380,18 @@ pub const Loop = struct { fn deinitOsData(self: *Loop) void { switch (builtin.os) { - builtin.Os.linux => { + .linux => { os.close(self.os_data.final_eventfd); while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); os.close(self.os_data.epollfd); self.allocator.free(self.eventfd_resume_nodes); }, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { + .macosx, .freebsd, .netbsd => { os.close(self.os_data.kqfd); os.close(self.os_data.fs_kqfd); }, - builtin.Os.windows => { - os.close(self.os_data.io_port); + .windows => { + windows.CloseHandle(self.os_data.io_port); }, else => {}, } @@ -400,28 +400,28 @@ pub const Loop = struct { /// resume_node must live longer than the promise that it holds a reference to. /// flags must contain EPOLLET pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void { - assert(flags & posix.EPOLLET == posix.EPOLLET); + assert(flags & os.EPOLLET == os.EPOLLET); self.beginOneEvent(); errdefer self.finishOneEvent(); try self.linuxModFd( fd, - posix.EPOLL_CTL_ADD, + os.EPOLL_CTL_ADD, flags, resume_node, ); } pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void { - assert(flags & posix.EPOLLET == posix.EPOLLET); + assert(flags & os.EPOLLET == os.EPOLLET); var ev = os.linux.epoll_event{ .events = flags, .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, }; - try os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev); + try os.epoll_ctl(self.os_data.epollfd, op, fd, &ev); } pub fn linuxRemoveFd(self: *Loop, fd: i32) void { - os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; + os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; self.finishOneEvent(); } @@ -440,7 +440,7 @@ pub const Loop = struct { } } - pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !posix.Kevent { + pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent { // TODO #1194 suspend { resume @handle(); @@ -464,31 +464,31 @@ pub const Loop = struct { pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode.Basic, ident: usize, filter: i16, fflags: u32) !void { self.beginOneEvent(); errdefer self.finishOneEvent(); - var kev = posix.Kevent{ + var kev = os.Kevent{ .ident = ident, .filter = filter, - .flags = posix.EV_ADD | posix.EV_ENABLE | posix.EV_CLEAR, + .flags = os.EV_ADD | os.EV_ENABLE | os.EV_CLEAR, .fflags = fflags, .data = 0, .udata = @ptrToInt(&resume_node.base), }; - const kevent_array = (*const [1]posix.Kevent)(&kev); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); + const kevent_array = (*const [1]os.Kevent)(&kev); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null); } pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void { - var kev = posix.Kevent{ + var kev = os.Kevent{ .ident = ident, .filter = filter, - .flags = posix.EV_DELETE, + .flags = os.EV_DELETE, .fflags = 0, .data = 0, .udata = 0, }; - const kevent_array = (*const [1]posix.Kevent)(&kev); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch undefined; + const kevent_array = (*const [1]os.Kevent)(&kev); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch undefined; self.finishOneEvent(); } @@ -501,18 +501,18 @@ pub const Loop = struct { const eventfd_node = &resume_stack_node.data; eventfd_node.base.handle = next_tick_node.data; switch (builtin.os) { - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - const kevent_array = (*const [1]posix.Kevent)(&eventfd_node.kevent); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { + .macosx, .freebsd, .netbsd => { + const kevent_array = (*const [1]os.Kevent)(&eventfd_node.kevent); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { self.next_tick_queue.unget(next_tick_node); self.available_eventfd_resume_nodes.push(resume_stack_node); return; }; }, - builtin.Os.linux => { + .linux => { // the pending count is already accounted for - const epoll_events = posix.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | + const epoll_events = os.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | os.linux.EPOLLET; self.linuxModFd( eventfd_node.eventfd, @@ -525,8 +525,8 @@ pub const Loop = struct { return; }; }, - builtin.Os.windows => { - os.windowsPostQueuedCompletionStatus( + .windows => { + windows.PostQueuedCompletionStatus( self.os_data.io_port, undefined, undefined, @@ -623,26 +623,26 @@ pub const Loop = struct { if (prev == 1) { // cause all the threads to stop switch (builtin.os) { - builtin.Os.linux => { + .linux => { self.posixFsRequest(&self.os_data.fs_end_request); // writing 8 bytes to an eventfd cannot fail - os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; return; }, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { + .macosx, .freebsd, .netbsd => { self.posixFsRequest(&self.os_data.fs_end_request); - const final_kevent = (*const [1]posix.Kevent)(&self.os_data.final_kevent); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + const final_kevent = (*const [1]os.Kevent)(&self.os_data.final_kevent); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; // cannot fail because we already added it and this just enables it - _ = os.bsdKEvent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; + _ = os.kevent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; return; }, - builtin.Os.windows => { + .windows => { var i: usize = 0; while (i < self.extra_threads.len + 1) : (i += 1) { while (true) { const overlapped = &self.final_resume_node.overlapped; - os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; + windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; break; } } @@ -663,10 +663,10 @@ pub const Loop = struct { } switch (builtin.os) { - builtin.Os.linux => { + .linux => { // only process 1 event so we don't steal from other threads var events: [1]os.linux.epoll_event = undefined; - const count = os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); + const count = os.epoll_wait(self.os_data.epollfd, events[0..], -1); for (events[0..count]) |ev| { const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); const handle = resume_node.handle; @@ -676,7 +676,7 @@ pub const Loop = struct { ResumeNode.Id.Stop => return, ResumeNode.Id.EventFd => { const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); - event_fd_node.epoll_op = posix.EPOLL_CTL_MOD; + event_fd_node.epoll_op = os.EPOLL_CTL_MOD; const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); self.available_eventfd_resume_nodes.push(stack_node); }, @@ -687,10 +687,10 @@ pub const Loop = struct { } } }, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - var eventlist: [1]posix.Kevent = undefined; - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - const count = os.bsdKEvent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable; + .macosx, .freebsd, .netbsd => { + var eventlist: [1]os.Kevent = undefined; + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable; for (eventlist[0..count]) |ev| { const resume_node = @intToPtr(*ResumeNode, ev.udata); const handle = resume_node.handle; @@ -713,16 +713,16 @@ pub const Loop = struct { } } }, - builtin.Os.windows => { + .windows => { var completion_key: usize = undefined; const overlapped = while (true) { var nbytes: windows.DWORD = undefined; var overlapped: ?*windows.OVERLAPPED = undefined; - switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { - os.WindowsWaitResult.Aborted => return, - os.WindowsWaitResult.Normal => {}, - os.WindowsWaitResult.EOF => {}, - os.WindowsWaitResult.Cancelled => continue, + switch (windows.GetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { + .Aborted => return, + .Normal => {}, + .EOF => {}, + .Cancelled => continue, } if (overlapped) |o| break o; } else unreachable; // TODO else unreachable should not be necessary @@ -751,16 +751,16 @@ pub const Loop = struct { self.os_data.fs_queue.put(request_node); switch (builtin.os) { builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - const fs_kevs = (*const [1]posix.Kevent)(&self.os_data.fs_kevent_wake); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; + const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wake); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; }, builtin.Os.linux => { _ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); const rc = os.linux.futex_wake(&self.os_data.fs_queue_item, os.linux.FUTEX_WAKE, 1); switch (os.linux.getErrno(rc)) { 0 => {}, - posix.EINVAL => unreachable, + os.EINVAL => unreachable, else => unreachable, } }, @@ -783,24 +783,24 @@ pub const Loop = struct { switch (node.data.msg) { @TagType(fs.Request.Msg).End => return, @TagType(fs.Request.Msg).PWriteV => |*msg| { - msg.result = os.posix_pwritev(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + msg.result = os.pwritev(msg.fd, msg.iov, msg.offset); }, @TagType(fs.Request.Msg).PReadV => |*msg| { - msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + msg.result = os.preadv(msg.fd, msg.iov, msg.offset); }, @TagType(fs.Request.Msg).Open => |*msg| { - msg.result = os.posixOpenC(msg.path.ptr, msg.flags, msg.mode); + msg.result = os.openC(msg.path.ptr, msg.flags, msg.mode); }, @TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd), @TagType(fs.Request.Msg).WriteFile => |*msg| blk: { - const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | - posix.O_CLOEXEC | posix.O_TRUNC; - const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| { + const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | + os.O_CLOEXEC | os.O_TRUNC; + const fd = os.openC(msg.path.ptr, flags, msg.mode) catch |err| { msg.result = err; break :blk; }; defer os.close(fd); - msg.result = os.posixWrite(fd, msg.contents); + msg.result = os.write(fd, msg.contents); }, } switch (node.data.finish) { @@ -816,14 +816,14 @@ pub const Loop = struct { builtin.Os.linux => { const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null); switch (os.linux.getErrno(rc)) { - 0, posix.EINTR, posix.EAGAIN => continue, + 0, os.EINTR, os.EAGAIN => continue, else => unreachable, } }, builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => { - const fs_kevs = (*const [1]posix.Kevent)(&self.os_data.fs_kevent_wait); - var out_kevs: [1]posix.Kevent = undefined; - _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; + const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wait); + var out_kevs: [1]os.Kevent = undefined; + _ = os.kevent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; }, else => @compileError("Unsupported OS"), } @@ -831,9 +831,9 @@ pub const Loop = struct { } const OsData = switch (builtin.os) { - builtin.Os.linux => LinuxOsData, - builtin.Os.macosx, builtin.Os.freebsd, builtin.Os.netbsd => KEventData, - builtin.Os.windows => struct { + .linux => LinuxOsData, + .macosx, .freebsd, .netbsd => KEventData, + .windows => struct { io_port: windows.HANDLE, extra_thread_count: usize, }, @@ -842,10 +842,10 @@ pub const Loop = struct { const KEventData = struct { kqfd: i32, - final_kevent: posix.Kevent, - fs_kevent_wake: posix.Kevent, - fs_kevent_wait: posix.Kevent, - fs_thread: *os.Thread, + final_kevent: os.Kevent, + fs_kevent_wake: os.Kevent, + fs_kevent_wait: os.Kevent, + fs_thread: *Thread, fs_kqfd: i32, fs_queue: std.atomic.Queue(fs.Request), fs_end_request: fs.RequestNode, @@ -855,7 +855,7 @@ pub const Loop = struct { epollfd: i32, final_eventfd: i32, final_eventfd_event: os.linux.epoll_event, - fs_thread: *os.Thread, + fs_thread: *Thread, fs_queue_item: i32, fs_queue: std.atomic.Queue(fs.Request), fs_end_request: fs.RequestNode, |
