diff options
| author | Andrew Kelley <superjoe30@gmail.com> | 2018-09-16 10:51:58 -0400 |
|---|---|---|
| committer | Andrew Kelley <superjoe30@gmail.com> | 2018-09-16 10:51:58 -0400 |
| commit | a2abdb185f9e47b663edce1bdfa3fa525502f321 (patch) | |
| tree | 9027e6f6886937afa463563dae176e5757cf006e /std/event | |
| parent | a6bf37f8ca5a2eabc7cacb22696d2a2c622a993d (diff) | |
| parent | 780e5674467ebac4534cd3d3f2199ccaf1d0922c (diff) | |
| download | zig-a2abdb185f9e47b663edce1bdfa3fa525502f321.tar.gz zig-a2abdb185f9e47b663edce1bdfa3fa525502f321.zip | |
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std/event')
| -rw-r--r-- | std/event/channel.zig | 2 | ||||
| -rw-r--r-- | std/event/fs.zig | 88 | ||||
| -rw-r--r-- | std/event/future.zig | 2 | ||||
| -rw-r--r-- | std/event/group.zig | 2 | ||||
| -rw-r--r-- | std/event/locked.zig | 2 | ||||
| -rw-r--r-- | std/event/loop.zig | 44 | ||||
| -rw-r--r-- | std/event/rwlocked.zig | 2 | ||||
| -rw-r--r-- | std/event/tcp.zig | 3 |
8 files changed, 79 insertions, 66 deletions
diff --git a/std/event/channel.zig b/std/event/channel.zig index 9ea75a2dd8..133ce1c69c 100644 --- a/std/event/channel.zig +++ b/std/event/channel.zig @@ -25,7 +25,7 @@ pub fn Channel(comptime T: type) type { buffer_index: usize, buffer_len: usize, - const SelfChannel = this; + const SelfChannel = @This(); const GetNode = struct { tick_node: *Loop.NextTickNode, data: Data, diff --git a/std/event/fs.zig b/std/event/fs.zig index 5e7e24ff43..bde5a306b6 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -109,30 +109,28 @@ pub async fn pwriteWindows(loop: *Loop, fd: os.FileHandle, data: []const u8, off .base = Loop.ResumeNode{ .id = Loop.ResumeNode.Id.Basic, .handle = @handle(), + .overlapped = windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, offset), + .OffsetHigh = @truncate(u32, offset >> 32), + .hEvent = null, + }, }, }; - const completion_key = @ptrToInt(&resume_node.base); - // TODO support concurrent async ops on the file handle - // we can do this by ignoring completion key and using @fieldParentPtr with the *Overlapped - _ = try os.windowsCreateIoCompletionPort(fd, loop.os_data.io_port, completion_key, undefined); - var overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = @truncate(u32, offset), - .OffsetHigh = @truncate(u32, offset >> 32), - .hEvent = null, - }; + // TODO only call create io completion port once per fd + _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined); loop.beginOneEvent(); errdefer loop.finishOneEvent(); errdefer { - _ = windows.CancelIoEx(fd, &overlapped); + _ = windows.CancelIoEx(fd, &resume_node.base.overlapped); } suspend { - _ = windows.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &overlapped); + _ = windows.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); } var bytes_transferred: windows.DWORD = undefined; - if (windows.GetOverlappedResult(fd, &overlapped, &bytes_transferred, windows.FALSE) == 0) { + if (windows.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { const err = windows.GetLastError(); return switch (err) { windows.ERROR.IO_PENDING => unreachable, @@ -243,37 +241,36 @@ pub async fn preadWindows(loop: *Loop, fd: os.FileHandle, data: []u8, offset: u6 .base = Loop.ResumeNode{ .id = Loop.ResumeNode.Id.Basic, .handle = @handle(), + .overlapped = windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, offset), + .OffsetHigh = @truncate(u32, offset >> 32), + .hEvent = null, + }, }, }; - const completion_key = @ptrToInt(&resume_node.base); - // TODO support concurrent async ops on the file handle - // we can do this by ignoring completion key and using @fieldParentPtr with the *Overlapped - _ = try os.windowsCreateIoCompletionPort(fd, loop.os_data.io_port, completion_key, undefined); - var overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = @truncate(u32, offset), - .OffsetHigh = @truncate(u32, offset >> 32), - .hEvent = null, - }; + // TODO only call create io completion port once per fd + _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined); loop.beginOneEvent(); errdefer loop.finishOneEvent(); errdefer { - _ = windows.CancelIoEx(fd, &overlapped); + _ = windows.CancelIoEx(fd, &resume_node.base.overlapped); } suspend { - _ = windows.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &overlapped); + _ = windows.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); } var bytes_transferred: windows.DWORD = undefined; - if (windows.GetOverlappedResult(fd, &overlapped, &bytes_transferred, windows.FALSE) == 0) { + if (windows.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { const err = windows.GetLastError(); - return switch (err) { + switch (err) { windows.ERROR.IO_PENDING => unreachable, - windows.ERROR.OPERATION_ABORTED => error.OperationAborted, - windows.ERROR.BROKEN_PIPE => error.BrokenPipe, - else => os.unexpectedErrorWindows(err), - }; + windows.ERROR.OPERATION_ABORTED => return error.OperationAborted, + windows.ERROR.BROKEN_PIPE => return error.BrokenPipe, + windows.ERROR.HANDLE_EOF => return usize(bytes_transferred), + else => return os.unexpectedErrorWindows(err), + } } return usize(bytes_transferred); } @@ -727,7 +724,7 @@ pub fn Watch(comptime V: type) type { const FileToHandle = std.AutoHashMap([]const u8, promise); - const Self = this; + const Self = @This(); pub const Event = struct { id: Id, @@ -1074,23 +1071,22 @@ pub fn Watch(comptime V: type) type { .base = Loop.ResumeNode{ .id = Loop.ResumeNode.Id.Basic, .handle = @handle(), + .overlapped = windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = 0, + .OffsetHigh = 0, + .hEvent = null, + }, }, }; - const completion_key = @ptrToInt(&resume_node.base); - var overlapped = windows.OVERLAPPED{ - .Internal = 0, - .InternalHigh = 0, - .Offset = 0, - .OffsetHigh = 0, - .hEvent = null, - }; var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; // TODO handle this error not in the channel but in the setup _ = os.windowsCreateIoCompletionPort( dir_handle, self.channel.loop.os_data.io_port, - completion_key, + undefined, undefined, ) catch |err| { await (async self.channel.put(err) catch unreachable); @@ -1103,7 +1099,7 @@ pub fn Watch(comptime V: type) type { self.channel.loop.beginOneEvent(); errdefer self.channel.loop.finishOneEvent(); errdefer { - _ = windows.CancelIoEx(dir_handle, &overlapped); + _ = windows.CancelIoEx(dir_handle, &resume_node.base.overlapped); } suspend { _ = windows.ReadDirectoryChangesW( @@ -1116,13 +1112,13 @@ pub fn Watch(comptime V: type) type { windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS | windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY, null, // number of bytes transferred (unused for async) - &overlapped, + &resume_node.base.overlapped, null, // completion routine - unused because we use IOCP ); } } var bytes_transferred: windows.DWORD = undefined; - if (windows.GetOverlappedResult(dir_handle, &overlapped, &bytes_transferred, windows.FALSE) == 0) { + if (windows.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { const errno = windows.GetLastError(); const err = switch (errno) { else => os.unexpectedErrorWindows(errno), diff --git a/std/event/future.zig b/std/event/future.zig index 8abdce7d02..d61768b198 100644 --- a/std/event/future.zig +++ b/std/event/future.zig @@ -21,7 +21,7 @@ pub fn Future(comptime T: type) type { /// 2 - finished available: u8, - const Self = this; + const Self = @This(); const Queue = std.atomic.Queue(promise); pub fn init(loop: *Loop) Self { diff --git a/std/event/group.zig b/std/event/group.zig index 2b5a517b2f..0bb3298cf8 100644 --- a/std/event/group.zig +++ b/std/event/group.zig @@ -13,7 +13,7 @@ pub fn Group(comptime ReturnType: type) type { alloc_stack: Stack, lock: Lock, - const Self = this; + const Self = @This(); const Error = switch (@typeInfo(ReturnType)) { builtin.TypeId.ErrorUnion => |payload| payload.error_set, diff --git a/std/event/locked.zig b/std/event/locked.zig index 7df56c4571..6718b1bf9c 100644 --- a/std/event/locked.zig +++ b/std/event/locked.zig @@ -10,7 +10,7 @@ pub fn Locked(comptime T: type) type { lock: Lock, private_data: T, - const Self = this; + const Self = @This(); pub const HeldLock = struct { value: *T, diff --git a/std/event/loop.zig b/std/event/loop.zig index 733112549d..e87e928049 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -27,6 +27,19 @@ pub const Loop = struct { pub const ResumeNode = struct { id: Id, handle: promise, + overlapped: Overlapped, + + pub const overlapped_init = switch (builtin.os) { + builtin.Os.windows => windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = 0, + .OffsetHigh = 0, + .hEvent = null, + }, + else => {}, + }; + pub const Overlapped = @typeOf(overlapped_init); pub const Id = enum { Basic, @@ -101,6 +114,7 @@ pub const Loop = struct { .final_resume_node = ResumeNode{ .id = ResumeNode.Id.Stop, .handle = undefined, + .overlapped = ResumeNode.overlapped_init, }, }; const extra_thread_count = thread_count - 1; @@ -153,6 +167,7 @@ pub const Loop = struct { .base = ResumeNode{ .id = ResumeNode.Id.EventFd, .handle = undefined, + .overlapped = ResumeNode.overlapped_init, }, .eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), .epoll_op = posix.EPOLL_CTL_ADD, @@ -225,6 +240,7 @@ pub const Loop = struct { .base = ResumeNode{ .id = ResumeNode.Id.EventFd, .handle = undefined, + .overlapped = ResumeNode.overlapped_init, }, // this one is for sending events .kevent = posix.Kevent{ @@ -311,6 +327,7 @@ pub const Loop = struct { .base = ResumeNode{ .id = ResumeNode.Id.EventFd, .handle = undefined, + .overlapped = ResumeNode.overlapped_init, }, // this one is for sending events .completion_key = @ptrToInt(&eventfd_node.data.base), @@ -325,8 +342,8 @@ pub const Loop = struct { var i: usize = 0; while (i < extra_thread_index) : (i += 1) { while (true) { - const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + const overlapped = &self.final_resume_node.overlapped; + os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; break; } } @@ -398,6 +415,7 @@ pub const Loop = struct { .base = ResumeNode{ .id = ResumeNode.Id.Basic, .handle = @handle(), + .overlapped = ResumeNode.overlapped_init, }, }; try self.linuxAddFd(fd, &resume_node.base, flags); @@ -413,6 +431,7 @@ pub const Loop = struct { .base = ResumeNode{ .id = ResumeNode.Id.Basic, .handle = @handle(), + .overlapped = ResumeNode.overlapped_init, }, .kev = undefined, }; @@ -489,15 +508,11 @@ pub const Loop = struct { }; }, builtin.Os.windows => { - // this value is never dereferenced but we need it to be non-null so that - // the consumer code can decide whether to read the completion key. - // it has to do this for normal I/O, so we match that behavior here. - const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); os.windowsPostQueuedCompletionStatus( self.os_data.io_port, undefined, - eventfd_node.completion_key, - overlapped, + undefined, + &eventfd_node.base.overlapped, ) catch { self.next_tick_queue.unget(next_tick_node); self.available_eventfd_resume_nodes.push(resume_stack_node); @@ -606,8 +621,8 @@ pub const Loop = struct { var i: usize = 0; while (i < self.extra_threads.len + 1) : (i += 1) { while (true) { - const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + const overlapped = &self.final_resume_node.overlapped; + os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; break; } } @@ -680,17 +695,18 @@ pub const Loop = struct { }, builtin.Os.windows => { var completion_key: usize = undefined; - while (true) { + const overlapped = while (true) { var nbytes: windows.DWORD = undefined; var overlapped: ?*windows.OVERLAPPED = undefined; switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { os.WindowsWaitResult.Aborted => return, os.WindowsWaitResult.Normal => {}, + os.WindowsWaitResult.EOF => {}, os.WindowsWaitResult.Cancelled => continue, } - if (overlapped != null) break; - } - const resume_node = @intToPtr(*ResumeNode, completion_key); + if (overlapped) |o| break o; + } else unreachable; // TODO else unreachable should not be necessary + const resume_node = @fieldParentPtr(ResumeNode, "overlapped", overlapped); const handle = resume_node.handle; const resume_node_id = resume_node.id; switch (resume_node_id) { diff --git a/std/event/rwlocked.zig b/std/event/rwlocked.zig index 1a6e77c27a..d305b1791e 100644 --- a/std/event/rwlocked.zig +++ b/std/event/rwlocked.zig @@ -10,7 +10,7 @@ pub fn RwLocked(comptime T: type) type { lock: RwLock, locked_data: T, - const Self = this; + const Self = @This(); pub const HeldReadLock = struct { value: *const T, diff --git a/std/event/tcp.zig b/std/event/tcp.zig index 491acab39d..5715e46a62 100644 --- a/std/event/tcp.zig +++ b/std/event/tcp.zig @@ -32,6 +32,7 @@ pub const Server = struct { .listen_resume_node = event.Loop.ResumeNode{ .id = event.Loop.ResumeNode.Id.Basic, .handle = undefined, + .overlapped = event.Loop.ResumeNode.overlapped_init, }, }; } @@ -131,7 +132,7 @@ test "listen on a port, send bytes, receive bytes" { const MyServer = struct { tcp_server: Server, - const Self = this; + const Self = @This(); async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: *const std.os.File) void { const self = @fieldParentPtr(Self, "tcp_server", tcp_server); var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733 |
