diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2019-09-26 01:54:45 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-09-26 01:54:45 -0400 |
| commit | 68bb3945708c43109c48bda3664176307d45b62c (patch) | |
| tree | afb9731e10cef9d192560b52cd9ae2cf179775c4 /lib/std/event | |
| parent | 6128bc728d1e1024a178c16c2149f5b1a167a013 (diff) | |
| parent | 4637e8f9699af9c3c6cf4df50ef5bb67c7a318a4 (diff) | |
| download | zig-68bb3945708c43109c48bda3664176307d45b62c.tar.gz zig-68bb3945708c43109c48bda3664176307d45b62c.zip | |
Merge pull request #3315 from ziglang/mv-std-lib
Move std/ to lib/std/
Diffstat (limited to 'lib/std/event')
| -rw-r--r-- | lib/std/event/channel.zig | 349 | ||||
| -rw-r--r-- | lib/std/event/fs.zig | 1431 | ||||
| -rw-r--r-- | lib/std/event/future.zig | 121 | ||||
| -rw-r--r-- | lib/std/event/group.zig | 131 | ||||
| -rw-r--r-- | lib/std/event/lock.zig | 186 | ||||
| -rw-r--r-- | lib/std/event/locked.zig | 43 | ||||
| -rw-r--r-- | lib/std/event/loop.zig | 923 | ||||
| -rw-r--r-- | lib/std/event/net.zig | 358 | ||||
| -rw-r--r-- | lib/std/event/rwlock.zig | 296 | ||||
| -rw-r--r-- | lib/std/event/rwlocked.zig | 58 |
10 files changed, 3896 insertions, 0 deletions
diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig new file mode 100644 index 0000000000..2f211d21e2 --- /dev/null +++ b/lib/std/event/channel.zig @@ -0,0 +1,349 @@ +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const testing = std.testing; +const Loop = std.event.Loop; + +/// many producer, many consumer, thread-safe, runtime configurable buffer size +/// when buffer is empty, consumers suspend and are resumed by producers +/// when buffer is full, producers suspend and are resumed by consumers +pub fn Channel(comptime T: type) type { + return struct { + loop: *Loop, + + getters: std.atomic.Queue(GetNode), + or_null_queue: std.atomic.Queue(*std.atomic.Queue(GetNode).Node), + putters: std.atomic.Queue(PutNode), + get_count: usize, + put_count: usize, + dispatch_lock: u8, // TODO make this a bool + need_dispatch: u8, // TODO make this a bool + + // simple fixed size ring buffer + buffer_nodes: []T, + buffer_index: usize, + buffer_len: usize, + + const SelfChannel = @This(); + const GetNode = struct { + tick_node: *Loop.NextTickNode, + data: Data, + + const Data = union(enum) { + Normal: Normal, + OrNull: OrNull, + }; + + const Normal = struct { + ptr: *T, + }; + + const OrNull = struct { + ptr: *?T, + or_null: *std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node, + }; + }; + const PutNode = struct { + data: T, + tick_node: *Loop.NextTickNode, + }; + + /// call destroy when done + pub fn create(loop: *Loop, capacity: usize) !*SelfChannel { + const buffer_nodes = try loop.allocator.alloc(T, capacity); + errdefer loop.allocator.free(buffer_nodes); + + const self = try loop.allocator.create(SelfChannel); + self.* = SelfChannel{ + .loop = loop, + .buffer_len = 0, + .buffer_nodes = buffer_nodes, + .buffer_index = 0, + .dispatch_lock = 0, + .need_dispatch = 0, + .getters = std.atomic.Queue(GetNode).init(), + .putters = std.atomic.Queue(PutNode).init(), + .or_null_queue = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).init(), + .get_count = 0, + .put_count = 0, + }; + errdefer loop.allocator.destroy(self); + + return self; + } + + /// must be called when all calls to put and get have suspended and no more calls occur + pub fn destroy(self: *SelfChannel) void { + while (self.getters.get()) |get_node| { + resume get_node.data.tick_node.data; + } + while (self.putters.get()) |put_node| { + resume put_node.data.tick_node.data; + } + self.loop.allocator.free(self.buffer_nodes); + self.loop.allocator.destroy(self); + } + + /// puts a data item in the channel. The function returns when the value has been added to the + /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter. + /// Or when the channel is destroyed. + pub fn put(self: *SelfChannel, data: T) void { + var my_tick_node = Loop.NextTickNode.init(@frame()); + var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{ + .tick_node = &my_tick_node, + .data = data, + }); + + // TODO test canceling a put() + errdefer { + _ = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst); + const need_dispatch = !self.putters.remove(&queue_node); + self.loop.cancelOnNextTick(&my_tick_node); + if (need_dispatch) { + // oops we made the put_count incorrect for a period of time. fix by dispatching. + _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst); + self.dispatch(); + } + } + suspend { + self.putters.put(&queue_node); + _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst); + + self.dispatch(); + } + } + + /// await this function to get an item from the channel. If the buffer is empty, the frame will + /// complete when the next item is put in the channel. + pub async fn get(self: *SelfChannel) T { + // TODO https://github.com/ziglang/zig/issues/2765 + var result: T = undefined; + var my_tick_node = Loop.NextTickNode.init(@frame()); + var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{ + .tick_node = &my_tick_node, + .data = GetNode.Data{ + .Normal = GetNode.Normal{ .ptr = &result }, + }, + }); + + // TODO test canceling a get() + errdefer { + _ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); + const need_dispatch = !self.getters.remove(&queue_node); + self.loop.cancelOnNextTick(&my_tick_node); + if (need_dispatch) { + // oops we made the get_count incorrect for a period of time. fix by dispatching. + _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); + self.dispatch(); + } + } + + suspend { + self.getters.put(&queue_node); + _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); + + self.dispatch(); + } + return result; + } + + //pub async fn select(comptime EnumUnion: type, channels: ...) EnumUnion { + // assert(@memberCount(EnumUnion) == channels.len); // enum union and channels mismatch + // assert(channels.len != 0); // enum unions cannot have 0 fields + // if (channels.len == 1) { + // const result = await (async channels[0].get() catch unreachable); + // return @unionInit(EnumUnion, @memberName(EnumUnion, 0), result); + // } + //} + + /// Await this function to get an item from the channel. If the buffer is empty and there are no + /// puts waiting, this returns null. + /// Await is necessary for locking purposes. The function will be resumed after checking the channel + /// for data and will not wait for data to be available. + pub async fn getOrNull(self: *SelfChannel) ?T { + // TODO integrate this function with named return values + // so we can get rid of this extra result copy + var result: ?T = null; + var my_tick_node = Loop.NextTickNode.init(@frame()); + var or_null_node = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node.init(undefined); + var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{ + .tick_node = &my_tick_node, + .data = GetNode.Data{ + .OrNull = GetNode.OrNull{ + .ptr = &result, + .or_null = &or_null_node, + }, + }, + }); + or_null_node.data = &queue_node; + + // TODO test canceling getOrNull + errdefer { + _ = self.or_null_queue.remove(&or_null_node); + _ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); + const need_dispatch = !self.getters.remove(&queue_node); + self.loop.cancelOnNextTick(&my_tick_node); + if (need_dispatch) { + // oops we made the get_count incorrect for a period of time. fix by dispatching. + _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); + self.dispatch(); + } + } + + suspend { + self.getters.put(&queue_node); + _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); + self.or_null_queue.put(&or_null_node); + + self.dispatch(); + } + return result; + } + + fn dispatch(self: *SelfChannel) void { + // set the "need dispatch" flag + _ = @atomicRmw(u8, &self.need_dispatch, .Xchg, 1, .SeqCst); + + lock: while (true) { + // set the lock flag + const prev_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 1, .SeqCst); + if (prev_lock != 0) return; + + // clear the need_dispatch flag since we're about to do it + _ = @atomicRmw(u8, &self.need_dispatch, .Xchg, 0, .SeqCst); + + while (true) { + one_dispatch: { + // later we correct these extra subtractions + var get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); + var put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst); + + // transfer self.buffer to self.getters + while (self.buffer_len != 0) { + if (get_count == 0) break :one_dispatch; + + const get_node = &self.getters.get().?.data; + switch (get_node.data) { + GetNode.Data.Normal => |info| { + info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len]; + }, + GetNode.Data.OrNull => |info| { + _ = self.or_null_queue.remove(info.or_null); + info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len]; + }, + } + self.loop.onNextTick(get_node.tick_node); + self.buffer_len -= 1; + + get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); + } + + // direct transfer self.putters to self.getters + while (get_count != 0 and put_count != 0) { + const get_node = &self.getters.get().?.data; + const put_node = &self.putters.get().?.data; + + switch (get_node.data) { + GetNode.Data.Normal => |info| { + info.ptr.* = put_node.data; + }, + GetNode.Data.OrNull => |info| { + _ = self.or_null_queue.remove(info.or_null); + info.ptr.* = put_node.data; + }, + } + self.loop.onNextTick(get_node.tick_node); + self.loop.onNextTick(put_node.tick_node); + + get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); + put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst); + } + + // transfer self.putters to self.buffer + while (self.buffer_len != self.buffer_nodes.len and put_count != 0) { + const put_node = &self.putters.get().?.data; + + self.buffer_nodes[self.buffer_index] = put_node.data; + self.loop.onNextTick(put_node.tick_node); + self.buffer_index +%= 1; + self.buffer_len += 1; + + put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst); + } + } + + // undo the extra subtractions + _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); + _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst); + + // All the "get or null" functions should resume now. + var remove_count: usize = 0; + while (self.or_null_queue.get()) |or_null_node| { + remove_count += @boolToInt(self.getters.remove(or_null_node.data)); + self.loop.onNextTick(or_null_node.data.data.tick_node); + } + if (remove_count != 0) { + _ = @atomicRmw(usize, &self.get_count, .Sub, remove_count, .SeqCst); + } + + // clear need-dispatch flag + const need_dispatch = @atomicRmw(u8, &self.need_dispatch, .Xchg, 0, .SeqCst); + if (need_dispatch != 0) continue; + + const my_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 0, .SeqCst); + assert(my_lock != 0); + + // we have to check again now that we unlocked + if (@atomicLoad(u8, &self.need_dispatch, .SeqCst) != 0) continue :lock; + + return; + } + } + } + }; +} + +test "std.event.Channel" { + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + // https://github.com/ziglang/zig/issues/3251 + if (std.os.freebsd.is_the_target) return error.SkipZigTest; + + var loop: Loop = undefined; + // TODO make a multi threaded test + try loop.initSingleThreaded(std.heap.direct_allocator); + defer loop.deinit(); + + const channel = try Channel(i32).create(&loop, 0); + defer channel.destroy(); + + const handle = async testChannelGetter(&loop, channel); + const putter = async testChannelPutter(channel); + + loop.run(); +} + +async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void { + const value1 = channel.get(); + testing.expect(value1 == 1234); + + const value2 = channel.get(); + testing.expect(value2 == 4567); + + const value3 = channel.getOrNull(); + testing.expect(value3 == null); + + var last_put = async testPut(channel, 4444); + const value4 = channel.getOrNull(); + testing.expect(value4.? == 4444); + await last_put; +} + +async fn testChannelPutter(channel: *Channel(i32)) void { + channel.put(1234); + channel.put(4567); +} + +async fn testPut(channel: *Channel(i32), value: i32) void { + channel.put(value); +} diff --git a/lib/std/event/fs.zig b/lib/std/event/fs.zig new file mode 100644 index 0000000000..4490e1deae --- /dev/null +++ b/lib/std/event/fs.zig @@ -0,0 +1,1431 @@ +const builtin = @import("builtin"); +const std = @import("../std.zig"); +const event = std.event; +const assert = std.debug.assert; +const testing = std.testing; +const os = std.os; +const mem = std.mem; +const windows = os.windows; +const Loop = event.Loop; +const fd_t = os.fd_t; +const File = std.fs.File; + +pub const RequestNode = std.atomic.Queue(Request).Node; + +pub const Request = struct { + msg: Msg, + finish: Finish, + + pub const Finish = union(enum) { + TickNode: Loop.NextTickNode, + DeallocCloseOperation: *CloseOperation, + NoAction, + }; + + pub const Msg = union(enum) { + WriteV: WriteV, + PWriteV: PWriteV, + PReadV: PReadV, + Open: Open, + Close: Close, + WriteFile: WriteFile, + End, // special - means the fs thread should exit + + pub const WriteV = struct { + fd: fd_t, + iov: []const os.iovec_const, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const PWriteV = struct { + fd: fd_t, + iov: []const os.iovec_const, + offset: usize, + result: Error!void, + + pub const Error = os.WriteError; + }; + + pub const PReadV = struct { + fd: fd_t, + iov: []const os.iovec, + offset: usize, + result: Error!usize, + + pub const Error = os.ReadError; + }; + + pub const Open = struct { + /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 + path: []const u8, + flags: u32, + mode: File.Mode, + result: Error!fd_t, + + pub const Error = File.OpenError; + }; + + pub const WriteFile = struct { + /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 + path: []const u8, + contents: []const u8, + mode: File.Mode, + result: Error!void, + + pub const Error = File.OpenError || File.WriteError; + }; + + pub const Close = struct { + fd: fd_t, + }; + }; +}; + +pub const PWriteVError = error{OutOfMemory} || File.WriteError; + +/// data - just the inner references - must live until pwritev frame completes. +pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void { + switch (builtin.os) { + .macosx, + .linux, + .freebsd, + .netbsd, + => { + const iovecs = try loop.allocator.alloc(os.iovec_const, data.len); + defer loop.allocator.free(iovecs); + + for (data) |buf, i| { + iovecs[i] = os.iovec_const{ + .iov_base = buf.ptr, + .iov_len = buf.len, + }; + } + + return pwritevPosix(loop, fd, iovecs, offset); + }, + .windows => { + const data_copy = try std.mem.dupe(loop.allocator, []const u8, data); + defer loop.allocator.free(data_copy); + return pwritevWindows(loop, fd, data, offset); + }, + else => @compileError("Unsupported OS"), + } +} + +/// data must outlive the returned frame +pub fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void { + if (data.len == 0) return; + if (data.len == 1) return pwriteWindows(loop, fd, data[0], offset); + + // TODO do these in parallel + var off = offset; + for (data) |buf| { + try pwriteWindows(loop, fd, buf, off); + off += buf.len; + } +} + +pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void { + var resume_node = Loop.ResumeNode.Basic{ + .base = Loop.ResumeNode{ + .id = Loop.ResumeNode.Id.Basic, + .handle = @frame(), + .overlapped = windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, offset), + .OffsetHigh = @truncate(u32, offset >> 32), + .hEvent = null, + }, + }, + }; + // TODO only call create io completion port once per fd + _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined); + loop.beginOneEvent(); + errdefer loop.finishOneEvent(); + + errdefer { + _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); + } + suspend { + _ = windows.kernel32.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); + } + var bytes_transferred: windows.DWORD = undefined; + if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + switch (windows.kernel32.GetLastError()) { + windows.ERROR.IO_PENDING => unreachable, + windows.ERROR.INVALID_USER_BUFFER => return error.SystemResources, + windows.ERROR.NOT_ENOUGH_MEMORY => return error.SystemResources, + windows.ERROR.OPERATION_ABORTED => return error.OperationAborted, + windows.ERROR.NOT_ENOUGH_QUOTA => return error.SystemResources, + windows.ERROR.BROKEN_PIPE => return error.BrokenPipe, + else => |err| return windows.unexpectedError(err), + } + } +} + +/// iovecs must live until pwritev frame completes. +pub fn pwritevPosix( + loop: *Loop, + fd: fd_t, + iovecs: []const os.iovec_const, + offset: usize, +) os.WriteError!void { + var req_node = RequestNode{ + .prev = null, + .next = null, + .data = Request{ + .msg = Request.Msg{ + .PWriteV = Request.Msg.PWriteV{ + .fd = fd, + .iov = iovecs, + .offset = offset, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = Loop.NextTickNode{ + .prev = null, + .next = null, + .data = @frame(), + }, + }, + }, + }; + + errdefer loop.posixFsCancel(&req_node); + + suspend { + loop.posixFsRequest(&req_node); + } + + return req_node.data.msg.PWriteV.result; +} + +/// iovecs must live until pwritev frame completes. +pub fn writevPosix( + loop: *Loop, + fd: fd_t, + iovecs: []const os.iovec_const, +) os.WriteError!void { + var req_node = RequestNode{ + .prev = null, + .next = null, + .data = Request{ + .msg = Request.Msg{ + .WriteV = Request.Msg.WriteV{ + .fd = fd, + .iov = iovecs, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = Loop.NextTickNode{ + .prev = null, + .next = null, + .data = @frame(), + }, + }, + }, + }; + + suspend { + loop.posixFsRequest(&req_node); + } + + return req_node.data.msg.WriteV.result; +} + +pub const PReadVError = error{OutOfMemory} || File.ReadError; + +/// data - just the inner references - must live until preadv frame completes. +pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize { + assert(data.len != 0); + switch (builtin.os) { + .macosx, + .linux, + .freebsd, + .netbsd, + => { + const iovecs = try loop.allocator.alloc(os.iovec, data.len); + defer loop.allocator.free(iovecs); + + for (data) |buf, i| { + iovecs[i] = os.iovec{ + .iov_base = buf.ptr, + .iov_len = buf.len, + }; + } + + return preadvPosix(loop, fd, iovecs, offset); + }, + .windows => { + const data_copy = try std.mem.dupe(loop.allocator, []u8, data); + defer loop.allocator.free(data_copy); + return preadvWindows(loop, fd, data_copy, offset); + }, + else => @compileError("Unsupported OS"), + } +} + +/// data must outlive the returned frame +pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize { + assert(data.len != 0); + if (data.len == 1) return preadWindows(loop, fd, data[0], offset); + + // TODO do these in parallel? + var off: usize = 0; + var iov_i: usize = 0; + var inner_off: usize = 0; + while (true) { + const v = data[iov_i]; + const amt_read = try preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off); + off += amt_read; + inner_off += amt_read; + if (inner_off == v.len) { + iov_i += 1; + inner_off = 0; + if (iov_i == data.len) { + return off; + } + } + if (amt_read == 0) return off; // EOF + } +} + +pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize { + var resume_node = Loop.ResumeNode.Basic{ + .base = Loop.ResumeNode{ + .id = Loop.ResumeNode.Id.Basic, + .handle = @frame(), + .overlapped = windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = @truncate(u32, offset), + .OffsetHigh = @truncate(u32, offset >> 32), + .hEvent = null, + }, + }, + }; + // TODO only call create io completion port once per fd + _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined) catch undefined; + loop.beginOneEvent(); + errdefer loop.finishOneEvent(); + + errdefer { + _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped); + } + suspend { + _ = windows.kernel32.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped); + } + var bytes_transferred: windows.DWORD = undefined; + if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { + switch (windows.kernel32.GetLastError()) { + windows.ERROR.IO_PENDING => unreachable, + windows.ERROR.OPERATION_ABORTED => return error.OperationAborted, + windows.ERROR.BROKEN_PIPE => return error.BrokenPipe, + windows.ERROR.HANDLE_EOF => return usize(bytes_transferred), + else => |err| return windows.unexpectedError(err), + } + } + return usize(bytes_transferred); +} + +/// iovecs must live until preadv frame completes +pub fn preadvPosix( + loop: *Loop, + fd: fd_t, + iovecs: []const os.iovec, + offset: usize, +) os.ReadError!usize { + var req_node = RequestNode{ + .prev = null, + .next = null, + .data = Request{ + .msg = Request.Msg{ + .PReadV = Request.Msg.PReadV{ + .fd = fd, + .iov = iovecs, + .offset = offset, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = Loop.NextTickNode{ + .prev = null, + .next = null, + .data = @frame(), + }, + }, + }, + }; + + errdefer loop.posixFsCancel(&req_node); + + suspend { + loop.posixFsRequest(&req_node); + } + + return req_node.data.msg.PReadV.result; +} + +pub fn openPosix( + loop: *Loop, + path: []const u8, + flags: u32, + mode: File.Mode, +) File.OpenError!fd_t { + const path_c = try std.os.toPosixPath(path); + + var req_node = RequestNode{ + .prev = null, + .next = null, + .data = Request{ + .msg = Request.Msg{ + .Open = Request.Msg.Open{ + .path = path_c[0..path.len], + .flags = flags, + .mode = mode, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = Loop.NextTickNode{ + .prev = null, + .next = null, + .data = @frame(), + }, + }, + }, + }; + + errdefer loop.posixFsCancel(&req_node); + + suspend { + loop.posixFsRequest(&req_node); + } + + return req_node.data.msg.Open.result; +} + +pub fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t { + switch (builtin.os) { + .macosx, .linux, .freebsd, .netbsd => { + const flags = os.O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC; + return openPosix(loop, path, flags, File.default_mode); + }, + + .windows => return windows.CreateFile( + path, + windows.GENERIC_READ, + windows.FILE_SHARE_READ, + null, + windows.OPEN_EXISTING, + windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + null, + ), + + else => @compileError("Unsupported OS"), + } +} + +/// Creates if does not exist. Truncates the file if it exists. +/// Uses the default mode. +pub fn openWrite(loop: *Loop, path: []const u8) File.OpenError!fd_t { + return openWriteMode(loop, path, File.default_mode); +} + +/// Creates if does not exist. Truncates the file if it exists. +pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenError!fd_t { + switch (builtin.os) { + .macosx, + .linux, + .freebsd, + .netbsd, + => { + const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC; + return openPosix(loop, path, flags, File.default_mode); + }, + .windows => return windows.CreateFile( + path, + windows.GENERIC_WRITE, + windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, + null, + windows.CREATE_ALWAYS, + windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + null, + ), + else => @compileError("Unsupported OS"), + } +} + +/// Creates if does not exist. Does not truncate. +pub fn openReadWrite( + loop: *Loop, + path: []const u8, + mode: File.Mode, +) File.OpenError!fd_t { + switch (builtin.os) { + .macosx, .linux, .freebsd, .netbsd => { + const flags = os.O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC; + return openPosix(loop, path, flags, mode); + }, + + .windows => return windows.CreateFile( + path, + windows.GENERIC_WRITE | windows.GENERIC_READ, + windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, + null, + windows.OPEN_ALWAYS, + windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + null, + ), + + else => @compileError("Unsupported OS"), + } +} + +/// This abstraction helps to close file handles in defer expressions +/// without the possibility of failure and without the use of suspend points. +/// Start a `CloseOperation` before opening a file, so that you can defer +/// `CloseOperation.finish`. +/// If you call `setHandle` then finishing will close the fd; otherwise finishing +/// will deallocate the `CloseOperation`. +pub const CloseOperation = struct { + loop: *Loop, + os_data: OsData, + + const OsData = switch (builtin.os) { + .linux, .macosx, .freebsd, .netbsd => OsDataPosix, + + .windows => struct { + handle: ?fd_t, + }, + + else => @compileError("Unsupported OS"), + }; + + const OsDataPosix = struct { + have_fd: bool, + close_req_node: RequestNode, + }; + + pub fn start(loop: *Loop) (error{OutOfMemory}!*CloseOperation) { + const self = try loop.allocator.create(CloseOperation); + self.* = CloseOperation{ + .loop = loop, + .os_data = switch (builtin.os) { + .linux, .macosx, .freebsd, .netbsd => initOsDataPosix(self), + .windows => OsData{ .handle = null }, + else => @compileError("Unsupported OS"), + }, + }; + return self; + } + + fn initOsDataPosix(self: *CloseOperation) OsData { + return OsData{ + .have_fd = false, + .close_req_node = RequestNode{ + .prev = null, + .next = null, + .data = Request{ + .msg = Request.Msg{ + .Close = Request.Msg.Close{ .fd = undefined }, + }, + .finish = Request.Finish{ .DeallocCloseOperation = self }, + }, + }, + }; + } + + /// Defer this after creating. + pub fn finish(self: *CloseOperation) void { + switch (builtin.os) { + .linux, + .macosx, + .freebsd, + .netbsd, + => { + if (self.os_data.have_fd) { + self.loop.posixFsRequest(&self.os_data.close_req_node); + } else { + self.loop.allocator.destroy(self); + } + }, + .windows => { + if (self.os_data.handle) |handle| { + os.close(handle); + } + self.loop.allocator.destroy(self); + }, + else => @compileError("Unsupported OS"), + } + } + + pub fn setHandle(self: *CloseOperation, handle: fd_t) void { + switch (builtin.os) { + .linux, + .macosx, + .freebsd, + .netbsd, + => { + self.os_data.close_req_node.data.msg.Close.fd = handle; + self.os_data.have_fd = true; + }, + .windows => { + self.os_data.handle = handle; + }, + else => @compileError("Unsupported OS"), + } + } + + /// Undo a `setHandle`. + pub fn clearHandle(self: *CloseOperation) void { + switch (builtin.os) { + .linux, + .macosx, + .freebsd, + .netbsd, + => { + self.os_data.have_fd = false; + }, + .windows => { + self.os_data.handle = null; + }, + else => @compileError("Unsupported OS"), + } + } + + pub fn getHandle(self: *CloseOperation) fd_t { + switch (builtin.os) { + .linux, + .macosx, + .freebsd, + .netbsd, + => { + assert(self.os_data.have_fd); + return self.os_data.close_req_node.data.msg.Close.fd; + }, + .windows => { + return self.os_data.handle.?; + }, + else => @compileError("Unsupported OS"), + } + } +}; + +/// contents must remain alive until writeFile completes. +/// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate +pub fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void { + return writeFileMode(loop, path, contents, File.default_mode); +} + +/// contents must remain alive until writeFile completes. +pub fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void { + switch (builtin.os) { + .linux, + .macosx, + .freebsd, + .netbsd, + => return writeFileModeThread(loop, path, contents, mode), + .windows => return writeFileWindows(loop, path, contents), + else => @compileError("Unsupported OS"), + } +} + +fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void { + const handle = try windows.CreateFile( + path, + windows.GENERIC_WRITE, + windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE, + null, + windows.CREATE_ALWAYS, + windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED, + null, + ); + defer os.close(handle); + + try pwriteWindows(loop, handle, contents, 0); +} + +fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void { + const path_with_null = try std.cstr.addNullByte(loop.allocator, path); + defer loop.allocator.free(path_with_null); + + var req_node = RequestNode{ + .prev = null, + .next = null, + .data = Request{ + .msg = Request.Msg{ + .WriteFile = Request.Msg.WriteFile{ + .path = path_with_null[0..path.len], + .contents = contents, + .mode = mode, + .result = undefined, + }, + }, + .finish = Request.Finish{ + .TickNode = Loop.NextTickNode{ + .prev = null, + .next = null, + .data = @frame(), + }, + }, + }, + }; + + errdefer loop.posixFsCancel(&req_node); + + suspend { + loop.posixFsRequest(&req_node); + } + + return req_node.data.msg.WriteFile.result; +} + +/// The frame resumes when the last data has been confirmed written, but before the file handle +/// is closed. +/// Caller owns returned memory. +pub fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 { + var close_op = try CloseOperation.start(loop); + defer close_op.finish(); + + const fd = try openRead(loop, file_path); + close_op.setHandle(fd); + + var list = std.ArrayList(u8).init(loop.allocator); + defer list.deinit(); + + while (true) { + try list.ensureCapacity(list.len + mem.page_size); + const buf = list.items[list.len..]; + const buf_array = [_][]u8{buf}; + const amt = try preadv(loop, fd, buf_array, list.len); + list.len += amt; + if (list.len > max_size) { + return error.FileTooBig; + } + if (amt < buf.len) { + return list.toOwnedSlice(); + } + } +} + +pub const WatchEventId = enum { + CloseWrite, + Delete, +}; + +fn eqlString(a: []const u16, b: []const u16) bool { + if (a.len != b.len) return false; + if (a.ptr == b.ptr) return true; + return mem.compare(u16, a, b) == .Equal; +} + +fn hashString(s: []const u16) u32 { + return @truncate(u32, std.hash.Wyhash.hash(0, @sliceToBytes(s))); +} + +//pub const WatchEventError = error{ +// UserResourceLimitReached, +// SystemResources, +// AccessDenied, +// Unexpected, // TODO remove this possibility +//}; +// +//pub fn Watch(comptime V: type) type { +// return struct { +// channel: *event.Channel(Event.Error!Event), +// os_data: OsData, +// +// const OsData = switch (builtin.os) { +// .macosx, .freebsd, .netbsd => struct { +// file_table: FileTable, +// table_lock: event.Lock, +// +// const FileTable = std.StringHashmap(*Put); +// const Put = struct { +// putter: anyframe, +// value_ptr: *V, +// }; +// }, +// +// .linux => LinuxOsData, +// .windows => WindowsOsData, +// +// else => @compileError("Unsupported OS"), +// }; +// +// const WindowsOsData = struct { +// table_lock: event.Lock, +// dir_table: DirTable, +// all_putters: std.atomic.Queue(anyframe), +// ref_count: std.atomic.Int(usize), +// +// const DirTable = std.StringHashMap(*Dir); +// const FileTable = std.HashMap([]const u16, V, hashString, eqlString); +// +// const Dir = struct { +// putter: anyframe, +// file_table: FileTable, +// table_lock: event.Lock, +// }; +// }; +// +// const LinuxOsData = struct { +// putter: anyframe, +// inotify_fd: i32, +// wd_table: WdTable, +// table_lock: event.Lock, +// +// const WdTable = std.AutoHashMap(i32, Dir); +// const FileTable = std.StringHashMap(V); +// +// const Dir = struct { +// dirname: []const u8, +// file_table: FileTable, +// }; +// }; +// +// const FileToHandle = std.StringHashMap(anyframe); +// +// const Self = @This(); +// +// pub const Event = struct { +// id: Id, +// data: V, +// +// pub const Id = WatchEventId; +// pub const Error = WatchEventError; +// }; +// +// pub fn create(loop: *Loop, event_buf_count: usize) !*Self { +// const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count); +// errdefer channel.destroy(); +// +// switch (builtin.os) { +// .linux => { +// const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); +// errdefer os.close(inotify_fd); +// +// var result: *Self = undefined; +// _ = try async<loop.allocator> linuxEventPutter(inotify_fd, channel, &result); +// return result; +// }, +// +// .windows => { +// const self = try loop.allocator.create(Self); +// errdefer loop.allocator.destroy(self); +// self.* = Self{ +// .channel = channel, +// .os_data = OsData{ +// .table_lock = event.Lock.init(loop), +// .dir_table = OsData.DirTable.init(loop.allocator), +// .ref_count = std.atomic.Int(usize).init(1), +// .all_putters = std.atomic.Queue(anyframe).init(), +// }, +// }; +// return self; +// }, +// +// .macosx, .freebsd, .netbsd => { +// const self = try loop.allocator.create(Self); +// errdefer loop.allocator.destroy(self); +// +// self.* = Self{ +// .channel = channel, +// .os_data = OsData{ +// .table_lock = event.Lock.init(loop), +// .file_table = OsData.FileTable.init(loop.allocator), +// }, +// }; +// return self; +// }, +// else => @compileError("Unsupported OS"), +// } +// } +// +// /// All addFile calls and removeFile calls must have completed. +// pub fn destroy(self: *Self) void { +// switch (builtin.os) { +// .macosx, .freebsd, .netbsd => { +// // TODO we need to cancel the frames before destroying the lock +// self.os_data.table_lock.deinit(); +// var it = self.os_data.file_table.iterator(); +// while (it.next()) |entry| { +// cancel entry.value.putter; +// self.channel.loop.allocator.free(entry.key); +// } +// self.channel.destroy(); +// }, +// .linux => cancel self.os_data.putter, +// .windows => { +// while (self.os_data.all_putters.get()) |putter_node| { +// cancel putter_node.data; +// } +// self.deref(); +// }, +// else => @compileError("Unsupported OS"), +// } +// } +// +// fn ref(self: *Self) void { +// _ = self.os_data.ref_count.incr(); +// } +// +// fn deref(self: *Self) void { +// if (self.os_data.ref_count.decr() == 1) { +// const allocator = self.channel.loop.allocator; +// self.os_data.table_lock.deinit(); +// var it = self.os_data.dir_table.iterator(); +// while (it.next()) |entry| { +// allocator.free(entry.key); +// allocator.destroy(entry.value); +// } +// self.os_data.dir_table.deinit(); +// self.channel.destroy(); +// allocator.destroy(self); +// } +// } +// +// pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V { +// switch (builtin.os) { +// .macosx, .freebsd, .netbsd => return await (async addFileKEvent(self, file_path, value) catch unreachable), +// .linux => return await (async addFileLinux(self, file_path, value) catch unreachable), +// .windows => return await (async addFileWindows(self, file_path, value) catch unreachable), +// else => @compileError("Unsupported OS"), +// } +// } +// +// async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V { +// const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path}); +// var resolved_path_consumed = false; +// defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); +// +// var close_op = try CloseOperation.start(self.channel.loop); +// var close_op_consumed = false; +// defer if (!close_op_consumed) close_op.finish(); +// +// const flags = if (os.darwin.is_the_target) os.O_SYMLINK | os.O_EVTONLY else 0; +// const mode = 0; +// const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable); +// close_op.setHandle(fd); +// +// var put_data: *OsData.Put = undefined; +// const putter = try async self.kqPutEvents(close_op, value, &put_data); +// close_op_consumed = true; +// errdefer cancel putter; +// +// const result = blk: { +// const held = await (async self.os_data.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// const gop = try self.os_data.file_table.getOrPut(resolved_path); +// if (gop.found_existing) { +// const prev_value = gop.kv.value.value_ptr.*; +// cancel gop.kv.value.putter; +// gop.kv.value = put_data; +// break :blk prev_value; +// } else { +// resolved_path_consumed = true; +// gop.kv.value = put_data; +// break :blk null; +// } +// }; +// +// return result; +// } +// +// async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { +// var value_copy = value; +// var put = OsData.Put{ +// .putter = @frame(), +// .value_ptr = &value_copy, +// }; +// out_put.* = &put; +// self.channel.loop.beginOneEvent(); +// +// defer { +// close_op.finish(); +// self.channel.loop.finishOneEvent(); +// } +// +// while (true) { +// if (await (async self.channel.loop.bsdWaitKev( +// @intCast(usize, close_op.getHandle()), +// os.EVFILT_VNODE, +// os.NOTE_WRITE | os.NOTE_DELETE, +// ) catch unreachable)) |kev| { +// // TODO handle EV_ERROR +// if (kev.fflags & os.NOTE_DELETE != 0) { +// await (async self.channel.put(Self.Event{ +// .id = Event.Id.Delete, +// .data = value_copy, +// }) catch unreachable); +// } else if (kev.fflags & os.NOTE_WRITE != 0) { +// await (async self.channel.put(Self.Event{ +// .id = Event.Id.CloseWrite, +// .data = value_copy, +// }) catch unreachable); +// } +// } else |err| switch (err) { +// error.EventNotFound => unreachable, +// error.ProcessNotFound => unreachable, +// error.Overflow => unreachable, +// error.AccessDenied, error.SystemResources => |casted_err| { +// await (async self.channel.put(casted_err) catch unreachable); +// }, +// } +// } +// } +// +// async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { +// const value_copy = value; +// +// const dirname = std.fs.path.dirname(file_path) orelse "."; +// const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); +// var dirname_with_null_consumed = false; +// defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null); +// +// const basename = std.fs.path.basename(file_path); +// const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename); +// var basename_with_null_consumed = false; +// defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); +// +// const wd = try os.inotify_add_watchC( +// self.os_data.inotify_fd, +// dirname_with_null.ptr, +// os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, +// ); +// // wd is either a newly created watch or an existing one. +// +// const held = await (async self.os_data.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// const gop = try self.os_data.wd_table.getOrPut(wd); +// if (!gop.found_existing) { +// gop.kv.value = OsData.Dir{ +// .dirname = dirname_with_null, +// .file_table = OsData.FileTable.init(self.channel.loop.allocator), +// }; +// dirname_with_null_consumed = true; +// } +// const dir = &gop.kv.value; +// +// const file_table_gop = try dir.file_table.getOrPut(basename_with_null); +// if (file_table_gop.found_existing) { +// const prev_value = file_table_gop.kv.value; +// file_table_gop.kv.value = value_copy; +// return prev_value; +// } else { +// file_table_gop.kv.value = value_copy; +// basename_with_null_consumed = true; +// return null; +// } +// } +// +// async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V { +// const value_copy = value; +// // TODO we might need to convert dirname and basename to canonical file paths ("short"?) +// +// const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse "."); +// var dirname_consumed = false; +// defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname); +// +// const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname); +// defer self.channel.loop.allocator.free(dirname_utf16le); +// +// // TODO https://github.com/ziglang/zig/issues/265 +// const basename = std.fs.path.basename(file_path); +// const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename); +// var basename_utf16le_null_consumed = false; +// defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null); +// const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1]; +// +// const dir_handle = try windows.CreateFileW( +// dirname_utf16le.ptr, +// windows.FILE_LIST_DIRECTORY, +// windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE, +// null, +// windows.OPEN_EXISTING, +// windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED, +// null, +// ); +// var dir_handle_consumed = false; +// defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle); +// +// const held = await (async self.os_data.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// const gop = try self.os_data.dir_table.getOrPut(dirname); +// if (gop.found_existing) { +// const dir = gop.kv.value; +// const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable); +// defer held_dir_lock.release(); +// +// const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null); +// if (file_gop.found_existing) { +// const prev_value = file_gop.kv.value; +// file_gop.kv.value = value_copy; +// return prev_value; +// } else { +// file_gop.kv.value = value_copy; +// basename_utf16le_null_consumed = true; +// return null; +// } +// } else { +// errdefer _ = self.os_data.dir_table.remove(dirname); +// const dir = try self.channel.loop.allocator.create(OsData.Dir); +// errdefer self.channel.loop.allocator.destroy(dir); +// +// dir.* = OsData.Dir{ +// .file_table = OsData.FileTable.init(self.channel.loop.allocator), +// .table_lock = event.Lock.init(self.channel.loop), +// .putter = undefined, +// }; +// gop.kv.value = dir; +// assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null); +// basename_utf16le_null_consumed = true; +// +// dir.putter = try async self.windowsDirReader(dir_handle, dir); +// dir_handle_consumed = true; +// +// dirname_consumed = true; +// +// return null; +// } +// } +// +// async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void { +// self.ref(); +// defer self.deref(); +// +// defer os.close(dir_handle); +// +// var putter_node = std.atomic.Queue(anyframe).Node{ +// .data = @frame(), +// .prev = null, +// .next = null, +// }; +// self.os_data.all_putters.put(&putter_node); +// defer _ = self.os_data.all_putters.remove(&putter_node); +// +// var resume_node = Loop.ResumeNode.Basic{ +// .base = Loop.ResumeNode{ +// .id = Loop.ResumeNode.Id.Basic, +// .handle = @frame(), +// .overlapped = windows.OVERLAPPED{ +// .Internal = 0, +// .InternalHigh = 0, +// .Offset = 0, +// .OffsetHigh = 0, +// .hEvent = null, +// }, +// }, +// }; +// var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined; +// +// // TODO handle this error not in the channel but in the setup +// _ = windows.CreateIoCompletionPort( +// dir_handle, +// self.channel.loop.os_data.io_port, +// undefined, +// undefined, +// ) catch |err| { +// await (async self.channel.put(err) catch unreachable); +// return; +// }; +// +// while (true) { +// { +// // TODO only 1 beginOneEvent for the whole function +// self.channel.loop.beginOneEvent(); +// errdefer self.channel.loop.finishOneEvent(); +// errdefer { +// _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped); +// } +// suspend { +// _ = windows.kernel32.ReadDirectoryChangesW( +// dir_handle, +// &event_buf, +// @intCast(windows.DWORD, event_buf.len), +// windows.FALSE, // watch subtree +// windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME | +// windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE | +// windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS | +// windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY, +// null, // number of bytes transferred (unused for async) +// &resume_node.base.overlapped, +// null, // completion routine - unused because we use IOCP +// ); +// } +// } +// var bytes_transferred: windows.DWORD = undefined; +// if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) { +// const err = switch (windows.kernel32.GetLastError()) { +// else => |err| windows.unexpectedError(err), +// }; +// await (async self.channel.put(err) catch unreachable); +// } else { +// // can't use @bytesToSlice because of the special variable length name field +// var ptr = event_buf[0..].ptr; +// const end_ptr = ptr + bytes_transferred; +// var ev: *windows.FILE_NOTIFY_INFORMATION = undefined; +// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) { +// ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr); +// const emit = switch (ev.Action) { +// windows.FILE_ACTION_REMOVED => WatchEventId.Delete, +// windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite, +// else => null, +// }; +// if (emit) |id| { +// const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2]; +// const user_value = blk: { +// const held = await (async dir.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// if (dir.file_table.get(basename_utf16le)) |entry| { +// break :blk entry.value; +// } else { +// break :blk null; +// } +// }; +// if (user_value) |v| { +// await (async self.channel.put(Event{ +// .id = id, +// .data = v, +// }) catch unreachable); +// } +// } +// if (ev.NextEntryOffset == 0) break; +// } +// } +// } +// } +// +// pub async fn removeFile(self: *Self, file_path: []const u8) ?V { +// @panic("TODO"); +// } +// +// async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void { +// const loop = channel.loop; +// +// var watch = Self{ +// .channel = channel, +// .os_data = OsData{ +// .putter = @frame(), +// .inotify_fd = inotify_fd, +// .wd_table = OsData.WdTable.init(loop.allocator), +// .table_lock = event.Lock.init(loop), +// }, +// }; +// out_watch.* = &watch; +// +// loop.beginOneEvent(); +// +// defer { +// watch.os_data.table_lock.deinit(); +// var wd_it = watch.os_data.wd_table.iterator(); +// while (wd_it.next()) |wd_entry| { +// var file_it = wd_entry.value.file_table.iterator(); +// while (file_it.next()) |file_entry| { +// loop.allocator.free(file_entry.key); +// } +// loop.allocator.free(wd_entry.value.dirname); +// } +// loop.finishOneEvent(); +// os.close(inotify_fd); +// channel.destroy(); +// } +// +// var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; +// +// while (true) { +// const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len); +// const errno = os.linux.getErrno(rc); +// switch (errno) { +// 0 => { +// // can't use @bytesToSlice because of the special variable length name field +// var ptr = event_buf[0..].ptr; +// const end_ptr = ptr + event_buf.len; +// var ev: *os.linux.inotify_event = undefined; +// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) { +// ev = @ptrCast(*os.linux.inotify_event, ptr); +// if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { +// const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); +// const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1]; +// const user_value = blk: { +// const held = await (async watch.os_data.table_lock.acquire() catch unreachable); +// defer held.release(); +// +// const dir = &watch.os_data.wd_table.get(ev.wd).?.value; +// if (dir.file_table.get(basename_with_null)) |entry| { +// break :blk entry.value; +// } else { +// break :blk null; +// } +// }; +// if (user_value) |v| { +// await (async channel.put(Event{ +// .id = WatchEventId.CloseWrite, +// .data = v, +// }) catch unreachable); +// } +// } +// } +// }, +// os.linux.EINTR => continue, +// os.linux.EINVAL => unreachable, +// os.linux.EFAULT => unreachable, +// os.linux.EAGAIN => { +// (await (async loop.linuxWaitFd( +// inotify_fd, +// os.linux.EPOLLET | os.linux.EPOLLIN, +// ) catch unreachable)) catch |err| { +// const transformed_err = switch (err) { +// error.FileDescriptorAlreadyPresentInSet => unreachable, +// error.OperationCausesCircularLoop => unreachable, +// error.FileDescriptorNotRegistered => unreachable, +// error.FileDescriptorIncompatibleWithEpoll => unreachable, +// error.Unexpected => unreachable, +// else => |e| e, +// }; +// await (async channel.put(transformed_err) catch unreachable); +// }; +// }, +// else => unreachable, +// } +// } +// } +// }; +//} + +const test_tmp_dir = "std_event_fs_test"; + +// TODO this test is disabled until the async function rewrite is finished. +//test "write a file, watch it, write it again" { +// return error.SkipZigTest; +// const allocator = std.heap.direct_allocator; +// +// // TODO move this into event loop too +// try os.makePath(allocator, test_tmp_dir); +// defer os.deleteTree(allocator, test_tmp_dir) catch {}; +// +// var loop: Loop = undefined; +// try loop.initMultiThreaded(allocator); +// defer loop.deinit(); +// +// var result: anyerror!void = error.ResultNeverWritten; +// const handle = try async<allocator> testFsWatchCantFail(&loop, &result); +// defer cancel handle; +// +// loop.run(); +// return result; +//} + +fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void { + result.* = testFsWatch(loop); +} + +fn testFsWatch(loop: *Loop) !void { + const file_path = try std.fs.path.join(loop.allocator, [][]const u8{ test_tmp_dir, "file.txt" }); + defer loop.allocator.free(file_path); + + const contents = + \\line 1 + \\line 2 + ; + const line2_offset = 7; + + // first just write then read the file + try writeFile(loop, file_path, contents); + + const read_contents = try readFile(loop, file_path, 1024 * 1024); + testing.expectEqualSlices(u8, contents, read_contents); + + // now watch the file + var watch = try Watch(void).create(loop, 0); + defer watch.destroy(); + + testing.expect((try watch.addFile(file_path, {})) == null); + + const ev = async watch.channel.get(); + var ev_consumed = false; + defer if (!ev_consumed) await ev; + + // overwrite line 2 + const fd = try await openReadWrite(loop, file_path, File.default_mode); + { + defer os.close(fd); + + try pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset); + } + + ev_consumed = true; + switch ((try await ev).id) { + WatchEventId.CloseWrite => {}, + WatchEventId.Delete => @panic("wrong event"), + } + const contents_updated = try readFile(loop, file_path, 1024 * 1024); + testing.expectEqualSlices(u8, + \\line 1 + \\lorem ipsum + , contents_updated); + + // TODO test deleting the file and then re-adding it. we should get events for both +} + +pub const OutStream = struct { + fd: fd_t, + stream: Stream, + loop: *Loop, + offset: usize, + + pub const Error = File.WriteError; + pub const Stream = event.io.OutStream(Error); + + pub fn init(loop: *Loop, fd: fd_t, offset: usize) OutStream { + return OutStream{ + .fd = fd, + .loop = loop, + .offset = offset, + .stream = Stream{ .writeFn = writeFn }, + }; + } + + fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { + const self = @fieldParentPtr(OutStream, "stream", out_stream); + const offset = self.offset; + self.offset += bytes.len; + return pwritev(self.loop, self.fd, [][]const u8{bytes}, offset); + } +}; + +pub const InStream = struct { + fd: fd_t, + stream: Stream, + loop: *Loop, + offset: usize, + + pub const Error = PReadVError; // TODO make this not have OutOfMemory + pub const Stream = event.io.InStream(Error); + + pub fn init(loop: *Loop, fd: fd_t, offset: usize) InStream { + return InStream{ + .fd = fd, + .loop = loop, + .offset = offset, + .stream = Stream{ .readFn = readFn }, + }; + } + + fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { + const self = @fieldParentPtr(InStream, "stream", in_stream); + const amt = try preadv(self.loop, self.fd, [][]u8{bytes}, self.offset); + self.offset += amt; + return amt; + } +}; diff --git a/lib/std/event/future.zig b/lib/std/event/future.zig new file mode 100644 index 0000000000..1e3508de41 --- /dev/null +++ b/lib/std/event/future.zig @@ -0,0 +1,121 @@ +const std = @import("../std.zig"); +const assert = std.debug.assert; +const testing = std.testing; +const builtin = @import("builtin"); +const Lock = std.event.Lock; +const Loop = std.event.Loop; + +/// This is a value that starts out unavailable, until resolve() is called +/// While it is unavailable, functions suspend when they try to get() it, +/// and then are resumed when resolve() is called. +/// At this point the value remains forever available, and another resolve() is not allowed. +pub fn Future(comptime T: type) type { + return struct { + lock: Lock, + data: T, + + /// TODO make this an enum + /// 0 - not started + /// 1 - started + /// 2 - finished + available: u8, + + const Self = @This(); + const Queue = std.atomic.Queue(anyframe); + + pub fn init(loop: *Loop) Self { + return Self{ + .lock = Lock.initLocked(loop), + .available = 0, + .data = undefined, + }; + } + + /// Obtain the value. If it's not available, wait until it becomes + /// available. + /// Thread-safe. + pub async fn get(self: *Self) *T { + if (@atomicLoad(u8, &self.available, .SeqCst) == 2) { + return &self.data; + } + const held = self.lock.acquire(); + held.release(); + + return &self.data; + } + + /// Gets the data without waiting for it. If it's available, a pointer is + /// returned. Otherwise, null is returned. + pub fn getOrNull(self: *Self) ?*T { + if (@atomicLoad(u8, &self.available, .SeqCst) == 2) { + return &self.data; + } else { + return null; + } + } + + /// If someone else has started working on the data, wait for them to complete + /// and return a pointer to the data. Otherwise, return null, and the caller + /// should start working on the data. + /// It's not required to call start() before resolve() but it can be useful since + /// this method is thread-safe. + pub async fn start(self: *Self) ?*T { + const state = @cmpxchgStrong(u8, &self.available, 0, 1, .SeqCst, .SeqCst) orelse return null; + switch (state) { + 1 => { + const held = self.lock.acquire(); + held.release(); + return &self.data; + }, + 2 => return &self.data, + else => unreachable, + } + } + + /// Make the data become available. May be called only once. + /// Before calling this, modify the `data` property. + pub fn resolve(self: *Self) void { + const prev = @atomicRmw(u8, &self.available, .Xchg, 2, .SeqCst); + assert(prev == 0 or prev == 1); // resolve() called twice + Lock.Held.release(Lock.Held{ .lock = &self.lock }); + } + }; +} + +test "std.event.Future" { + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + // https://github.com/ziglang/zig/issues/3251 + if (std.os.freebsd.is_the_target) return error.SkipZigTest; + + const allocator = std.heap.direct_allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + const handle = async testFuture(&loop); + + loop.run(); +} + +fn testFuture(loop: *Loop) void { + var future = Future(i32).init(loop); + + var a = async waitOnFuture(&future); + var b = async waitOnFuture(&future); + resolveFuture(&future); + + const result = (await a) + (await b); + + testing.expect(result == 12); +} + +fn waitOnFuture(future: *Future(i32)) i32 { + return future.get().*; +} + +fn resolveFuture(future: *Future(i32)) void { + future.data = 6; + future.resolve(); +} diff --git a/lib/std/event/group.zig b/lib/std/event/group.zig new file mode 100644 index 0000000000..f96b938f80 --- /dev/null +++ b/lib/std/event/group.zig @@ -0,0 +1,131 @@ +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const Lock = std.event.Lock; +const Loop = std.event.Loop; +const testing = std.testing; + +/// ReturnType must be `void` or `E!void` +pub fn Group(comptime ReturnType: type) type { + return struct { + frame_stack: Stack, + alloc_stack: Stack, + lock: Lock, + + const Self = @This(); + + const Error = switch (@typeInfo(ReturnType)) { + .ErrorUnion => |payload| payload.error_set, + else => void, + }; + const Stack = std.atomic.Stack(anyframe->ReturnType); + + pub fn init(loop: *Loop) Self { + return Self{ + .frame_stack = Stack.init(), + .alloc_stack = Stack.init(), + .lock = Lock.init(loop), + }; + } + + /// Add a frame to the group. Thread-safe. + pub fn add(self: *Self, handle: anyframe->ReturnType) (error{OutOfMemory}!void) { + const node = try self.lock.loop.allocator.create(Stack.Node); + node.* = Stack.Node{ + .next = undefined, + .data = handle, + }; + self.alloc_stack.push(node); + } + + /// Add a node to the group. Thread-safe. Cannot fail. + /// `node.data` should be the frame handle to add to the group. + /// The node's memory should be in the function frame of + /// the handle that is in the node, or somewhere guaranteed to live + /// at least as long. + pub fn addNode(self: *Self, node: *Stack.Node) void { + self.frame_stack.push(node); + } + + /// Wait for all the calls and promises of the group to complete. + /// Thread-safe. + /// Safe to call any number of times. + pub async fn wait(self: *Self) ReturnType { + const held = self.lock.acquire(); + defer held.release(); + + var result: ReturnType = {}; + + while (self.frame_stack.pop()) |node| { + if (Error == void) { + await node.data; + } else { + (await node.data) catch |err| { + result = err; + }; + } + } + while (self.alloc_stack.pop()) |node| { + const handle = node.data; + self.lock.loop.allocator.destroy(node); + if (Error == void) { + await handle; + } else { + (await handle) catch |err| { + result = err; + }; + } + } + return result; + } + }; +} + +test "std.event.Group" { + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + + const allocator = std.heap.direct_allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + const handle = async testGroup(&loop); + + loop.run(); +} + +async fn testGroup(loop: *Loop) void { + var count: usize = 0; + var group = Group(void).init(loop); + var sleep_a_little_frame = async sleepALittle(&count); + group.add(&sleep_a_little_frame) catch @panic("memory"); + var increase_by_ten_frame = async increaseByTen(&count); + group.add(&increase_by_ten_frame) catch @panic("memory"); + group.wait(); + testing.expect(count == 11); + + var another = Group(anyerror!void).init(loop); + var something_else_frame = async somethingElse(); + another.add(&something_else_frame) catch @panic("memory"); + var something_that_fails_frame = async doSomethingThatFails(); + another.add(&something_that_fails_frame) catch @panic("memory"); + testing.expectError(error.ItBroke, another.wait()); +} + +async fn sleepALittle(count: *usize) void { + std.time.sleep(1 * std.time.millisecond); + _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); +} + +async fn increaseByTen(count: *usize) void { + var i: usize = 0; + while (i < 10) : (i += 1) { + _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); + } +} + +async fn doSomethingThatFails() anyerror!void {} +async fn somethingElse() anyerror!void { + return error.ItBroke; +} diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig new file mode 100644 index 0000000000..a0b1fd3e50 --- /dev/null +++ b/lib/std/event/lock.zig @@ -0,0 +1,186 @@ +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const testing = std.testing; +const mem = std.mem; +const Loop = std.event.Loop; + +/// Thread-safe async/await lock. +/// Functions which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +/// Allows only one actor to hold the lock. +pub const Lock = struct { + loop: *Loop, + shared_bit: u8, // TODO make this a bool + queue: Queue, + queue_empty_bit: u8, // TODO make this a bool + + const Queue = std.atomic.Queue(anyframe); + + pub const Held = struct { + lock: *Lock, + + pub fn release(self: Held) void { + // Resume the next item from the queue. + if (self.lock.queue.get()) |node| { + self.lock.loop.onNextTick(node); + return; + } + + // We need to release the lock. + _ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst); + + // There might be a queue item. If we know the queue is empty, we can be done, + // because the other actor will try to obtain the lock. + // But if there's a queue item, we are the actor which must loop and attempt + // to grab the lock again. + if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) { + return; + } + + while (true) { + const old_bit = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 1, .SeqCst); + if (old_bit != 0) { + // We did not obtain the lock. Great, the queue is someone else's problem. + return; + } + + // Resume the next item from the queue. + if (self.lock.queue.get()) |node| { + self.lock.loop.onNextTick(node); + return; + } + + // Release the lock again. + _ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst); + + // Find out if we can be done. + if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) { + return; + } + } + } + }; + + pub fn init(loop: *Loop) Lock { + return Lock{ + .loop = loop, + .shared_bit = 0, + .queue = Queue.init(), + .queue_empty_bit = 1, + }; + } + + pub fn initLocked(loop: *Loop) Lock { + return Lock{ + .loop = loop, + .shared_bit = 1, + .queue = Queue.init(), + .queue_empty_bit = 1, + }; + } + + /// Must be called when not locked. Not thread safe. + /// All calls to acquire() and release() must complete before calling deinit(). + pub fn deinit(self: *Lock) void { + assert(self.shared_bit == 0); + while (self.queue.get()) |node| resume node.data; + } + + pub async fn acquire(self: *Lock) Held { + var my_tick_node = Loop.NextTickNode.init(@frame()); + + errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire + suspend { + self.queue.put(&my_tick_node); + + // At this point, we are in the queue, so we might have already been resumed. + + // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor + // will attempt to grab the lock. + _ = @atomicRmw(u8, &self.queue_empty_bit, .Xchg, 0, .SeqCst); + + const old_bit = @atomicRmw(u8, &self.shared_bit, .Xchg, 1, .SeqCst); + if (old_bit == 0) { + if (self.queue.get()) |node| { + // Whether this node is us or someone else, we tail resume it. + resume node.data; + } + } + } + + return Held{ .lock = self }; + } +}; + +test "std.event.Lock" { + // TODO https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + // TODO https://github.com/ziglang/zig/issues/3251 + if (std.os.freebsd.is_the_target) return error.SkipZigTest; + + const allocator = std.heap.direct_allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var lock = Lock.init(&loop); + defer lock.deinit(); + + _ = async testLock(&loop, &lock); + loop.run(); + + testing.expectEqualSlices(i32, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len, shared_test_data); +} + +async fn testLock(loop: *Loop, lock: *Lock) void { + var handle1 = async lockRunner(lock); + var tick_node1 = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = &handle1, + }; + loop.onNextTick(&tick_node1); + + var handle2 = async lockRunner(lock); + var tick_node2 = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = &handle2, + }; + loop.onNextTick(&tick_node2); + + var handle3 = async lockRunner(lock); + var tick_node3 = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = &handle3, + }; + loop.onNextTick(&tick_node3); + + await handle1; + await handle2; + await handle3; +} + +var shared_test_data = [1]i32{0} ** 10; +var shared_test_index: usize = 0; + +async fn lockRunner(lock: *Lock) void { + suspend; // resumed by onNextTick + + var i: usize = 0; + while (i < shared_test_data.len) : (i += 1) { + var lock_frame = async lock.acquire(); + const handle = await lock_frame; + defer handle.release(); + + shared_test_index = 0; + while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) { + shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1; + } + } +} diff --git a/lib/std/event/locked.zig b/lib/std/event/locked.zig new file mode 100644 index 0000000000..aeedf3558a --- /dev/null +++ b/lib/std/event/locked.zig @@ -0,0 +1,43 @@ +const std = @import("../std.zig"); +const Lock = std.event.Lock; +const Loop = std.event.Loop; + +/// Thread-safe async/await lock that protects one piece of data. +/// Functions which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +pub fn Locked(comptime T: type) type { + return struct { + lock: Lock, + private_data: T, + + const Self = @This(); + + pub const HeldLock = struct { + value: *T, + held: Lock.Held, + + pub fn release(self: HeldLock) void { + self.held.release(); + } + }; + + pub fn init(loop: *Loop, data: T) Self { + return Self{ + .lock = Lock.init(loop), + .private_data = data, + }; + } + + pub fn deinit(self: *Self) void { + self.lock.deinit(); + } + + pub async fn acquire(self: *Self) HeldLock { + return HeldLock{ + // TODO guaranteed allocation elision + .held = await (async self.lock.acquire() catch unreachable), + .value = &self.private_data, + }; + } + }; +} diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig new file mode 100644 index 0000000000..d0d36abc0c --- /dev/null +++ b/lib/std/event/loop.zig @@ -0,0 +1,923 @@ +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const root = @import("root"); +const assert = std.debug.assert; +const testing = std.testing; +const mem = std.mem; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const fs = std.event.fs; +const os = std.os; +const windows = os.windows; +const maxInt = std.math.maxInt; +const Thread = std.Thread; + +pub const Loop = struct { + allocator: *mem.Allocator, + next_tick_queue: std.atomic.Queue(anyframe), + os_data: OsData, + final_resume_node: ResumeNode, + pending_event_count: usize, + extra_threads: []*Thread, + + // pre-allocated eventfds. all permanently active. + // this is how we send promises to be resumed on other threads. + available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), + eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, + + pub const NextTickNode = std.atomic.Queue(anyframe).Node; + + pub const ResumeNode = struct { + id: Id, + handle: anyframe, + overlapped: Overlapped, + + pub const overlapped_init = switch (builtin.os) { + .windows => windows.OVERLAPPED{ + .Internal = 0, + .InternalHigh = 0, + .Offset = 0, + .OffsetHigh = 0, + .hEvent = null, + }, + else => {}, + }; + pub const Overlapped = @typeOf(overlapped_init); + + pub const Id = enum { + Basic, + Stop, + EventFd, + }; + + pub const EventFd = switch (builtin.os) { + .macosx, .freebsd, .netbsd => KEventFd, + .linux => struct { + base: ResumeNode, + epoll_op: u32, + eventfd: i32, + }, + .windows => struct { + base: ResumeNode, + completion_key: usize, + }, + else => @compileError("unsupported OS"), + }; + + const KEventFd = struct { + base: ResumeNode, + kevent: os.Kevent, + }; + + pub const Basic = switch (builtin.os) { + .macosx, .freebsd, .netbsd => KEventBasic, + .linux => struct { + base: ResumeNode, + }, + .windows => struct { + base: ResumeNode, + }, + else => @compileError("unsupported OS"), + }; + + const KEventBasic = struct { + base: ResumeNode, + kev: os.Kevent, + }; + }; + + var global_instance_state: Loop = undefined; + const default_instance: ?*Loop = switch (std.io.mode) { + .blocking => null, + .evented => &global_instance_state, + }; + pub const instance: ?*Loop = if (@hasDecl(root, "event_loop")) root.event_loop else default_instance; + + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 + pub fn init(self: *Loop, allocator: *mem.Allocator) !void { + if (builtin.single_threaded) { + return self.initSingleThreaded(allocator); + } else { + return self.initMultiThreaded(allocator); + } + } + + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 + pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { + return self.initInternal(allocator, 1); + } + + /// The allocator must be thread-safe because we use it for multiplexing + /// async functions onto kernel threads. + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 + pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { + if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode"); + const core_count = try Thread.cpuCount(); + return self.initInternal(allocator, core_count); + } + + /// Thread count is the total thread count. The thread pool size will be + /// max(thread_count - 1, 0) + fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { + self.* = Loop{ + .pending_event_count = 1, + .allocator = allocator, + .os_data = undefined, + .next_tick_queue = std.atomic.Queue(anyframe).init(), + .extra_threads = undefined, + .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), + .eventfd_resume_nodes = undefined, + .final_resume_node = ResumeNode{ + .id = ResumeNode.Id.Stop, + .handle = undefined, + .overlapped = ResumeNode.overlapped_init, + }, + }; + // We need at least one of these in case the fs thread wants to use onNextTick + const extra_thread_count = thread_count - 1; + const resume_node_count = std.math.max(extra_thread_count, 1); + self.eventfd_resume_nodes = try self.allocator.alloc( + std.atomic.Stack(ResumeNode.EventFd).Node, + resume_node_count, + ); + errdefer self.allocator.free(self.eventfd_resume_nodes); + + self.extra_threads = try self.allocator.alloc(*Thread, extra_thread_count); + errdefer self.allocator.free(self.extra_threads); + + try self.initOsData(extra_thread_count); + errdefer self.deinitOsData(); + } + + pub fn deinit(self: *Loop) void { + self.deinitOsData(); + self.allocator.free(self.extra_threads); + } + + const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError || + Thread.SpawnError || os.EpollCtlError || os.KEventError || + windows.CreateIoCompletionPortError; + + const wakeup_bytes = [_]u8{0x1} ** 8; + + fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { + switch (builtin.os) { + .linux => { + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue_item = 0; + // we need another thread for the file system because Linux does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .prev = undefined, + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; + + errdefer { + while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + } + for (self.eventfd_resume_nodes) |*eventfd_node| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = .EventFd, + .handle = undefined, + .overlapped = ResumeNode.overlapped_init, + }, + .eventfd = try os.eventfd(1, os.EFD_CLOEXEC | os.EFD_NONBLOCK), + .epoll_op = os.EPOLL_CTL_ADD, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC); + errdefer os.close(self.os_data.epollfd); + + self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK); + errdefer os.close(self.os_data.final_eventfd); + + self.os_data.final_eventfd_event = os.epoll_event{ + .events = os.EPOLLIN, + .data = os.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, + }; + try os.epoll_ctl( + self.os_data.epollfd, + os.EPOLL_CTL_ADD, + self.os_data.final_eventfd, + &self.os_data.final_eventfd_event, + ); + + self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); + errdefer { + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + + if (builtin.single_threaded) { + assert(extra_thread_count == 0); + return; + } + + var extra_thread_index: usize = 0; + errdefer { + // writing 8 bytes to an eventfd cannot fail + os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); + } + }, + .macosx, .freebsd, .netbsd => { + self.os_data.kqfd = try os.kqueue(); + errdefer os.close(self.os_data.kqfd); + + self.os_data.fs_kqfd = try os.kqueue(); + errdefer os.close(self.os_data.fs_kqfd); + + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + // we need another thread for the file system because Darwin does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .prev = undefined, + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; + + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + .overlapped = ResumeNode.overlapped_init, + }, + // this one is for sending events + .kevent = os.Kevent{ + .ident = i, + .filter = os.EVFILT_USER, + .flags = os.EV_CLEAR | os.EV_ADD | os.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + const kevent_array = (*const [1]os.Kevent)(&eventfd_node.data.kevent); + _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null); + eventfd_node.data.kevent.flags = os.EV_CLEAR | os.EV_ENABLE; + eventfd_node.data.kevent.fflags = os.NOTE_TRIGGER; + } + + // Pre-add so that we cannot get error.SystemResources + // later when we try to activate it. + self.os_data.final_kevent = os.Kevent{ + .ident = extra_thread_count, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&self.final_resume_node), + }; + const final_kev_arr = (*const [1]os.Kevent)(&self.os_data.final_kevent); + _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); + self.os_data.final_kevent.flags = os.EV_ENABLE; + self.os_data.final_kevent.fflags = os.NOTE_TRIGGER; + + self.os_data.fs_kevent_wake = os.Kevent{ + .ident = 0, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_ENABLE, + .fflags = os.NOTE_TRIGGER, + .data = 0, + .udata = undefined, + }; + + self.os_data.fs_kevent_wait = os.Kevent{ + .ident = 0, + .filter = os.EVFILT_USER, + .flags = os.EV_ADD | os.EV_CLEAR, + .fflags = 0, + .data = 0, + .udata = undefined, + }; + + self.os_data.fs_thread = try Thread.spawn(self, posixFsRun); + errdefer { + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + + if (builtin.single_threaded) { + assert(extra_thread_count == 0); + return; + } + + var extra_thread_index: usize = 0; + errdefer { + _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); + } + }, + .windows => { + self.os_data.io_port = try windows.CreateIoCompletionPort( + windows.INVALID_HANDLE_VALUE, + null, + undefined, + maxInt(windows.DWORD), + ); + errdefer windows.CloseHandle(self.os_data.io_port); + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + .overlapped = ResumeNode.overlapped_init, + }, + // this one is for sending events + .completion_key = @ptrToInt(&eventfd_node.data.base), + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + if (builtin.single_threaded) { + assert(extra_thread_count == 0); + return; + } + + var extra_thread_index: usize = 0; + errdefer { + var i: usize = 0; + while (i < extra_thread_index) : (i += 1) { + while (true) { + const overlapped = &self.final_resume_node.overlapped; + windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; + break; + } + } + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun); + } + }, + else => {}, + } + } + + fn deinitOsData(self: *Loop) void { + switch (builtin.os) { + .linux => { + os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + os.close(self.os_data.epollfd); + self.allocator.free(self.eventfd_resume_nodes); + }, + .macosx, .freebsd, .netbsd => { + os.close(self.os_data.kqfd); + os.close(self.os_data.fs_kqfd); + }, + .windows => { + windows.CloseHandle(self.os_data.io_port); + }, + else => {}, + } + } + + /// resume_node must live longer than the anyframe that it holds a reference to. + /// flags must contain EPOLLET + pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void { + assert(flags & os.EPOLLET == os.EPOLLET); + self.beginOneEvent(); + errdefer self.finishOneEvent(); + try self.linuxModFd( + fd, + os.EPOLL_CTL_ADD, + flags, + resume_node, + ); + } + + pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void { + assert(flags & os.EPOLLET == os.EPOLLET); + var ev = os.linux.epoll_event{ + .events = flags, + .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, + }; + try os.epoll_ctl(self.os_data.epollfd, op, fd, &ev); + } + + pub fn linuxRemoveFd(self: *Loop, fd: i32) void { + os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, null) catch {}; + self.finishOneEvent(); + } + + pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void { + defer self.linuxRemoveFd(fd); + suspend { + var resume_node = ResumeNode.Basic{ + .base = ResumeNode{ + .id = .Basic, + .handle = @frame(), + .overlapped = ResumeNode.overlapped_init, + }, + }; + try self.linuxAddFd(fd, &resume_node.base, flags); + } + } + + pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) !void { + return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); + } + + pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent { + var resume_node = ResumeNode.Basic{ + .base = ResumeNode{ + .id = ResumeNode.Id.Basic, + .handle = @frame(), + .overlapped = ResumeNode.overlapped_init, + }, + .kev = undefined, + }; + defer self.bsdRemoveKev(ident, filter); + suspend { + try self.bsdAddKev(&resume_node, ident, filter, fflags); + } + return resume_node.kev; + } + + /// resume_node must live longer than the anyframe that it holds a reference to. + pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode.Basic, ident: usize, filter: i16, fflags: u32) !void { + self.beginOneEvent(); + errdefer self.finishOneEvent(); + var kev = os.Kevent{ + .ident = ident, + .filter = filter, + .flags = os.EV_ADD | os.EV_ENABLE | os.EV_CLEAR, + .fflags = fflags, + .data = 0, + .udata = @ptrToInt(&resume_node.base), + }; + const kevent_array = (*const [1]os.Kevent)(&kev); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null); + } + + pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void { + var kev = os.Kevent{ + .ident = ident, + .filter = filter, + .flags = os.EV_DELETE, + .fflags = 0, + .data = 0, + .udata = 0, + }; + const kevent_array = (*const [1]os.Kevent)(&kev); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch undefined; + self.finishOneEvent(); + } + + fn dispatch(self: *Loop) void { + while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { + const next_tick_node = self.next_tick_queue.get() orelse { + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + const eventfd_node = &resume_stack_node.data; + eventfd_node.base.handle = next_tick_node.data; + switch (builtin.os) { + .macosx, .freebsd, .netbsd => { + const kevent_array = (*const [1]os.Kevent)(&eventfd_node.kevent); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + .linux => { + // the pending count is already accounted for + const epoll_events = os.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | + os.linux.EPOLLET; + self.linuxModFd( + eventfd_node.eventfd, + eventfd_node.epoll_op, + epoll_events, + &eventfd_node.base, + ) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + .windows => { + windows.PostQueuedCompletionStatus( + self.os_data.io_port, + undefined, + undefined, + &eventfd_node.base.overlapped, + ) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + else => @compileError("unsupported OS"), + } + } + } + + /// Bring your own linked list node. This means it can't fail. + pub fn onNextTick(self: *Loop, node: *NextTickNode) void { + self.beginOneEvent(); // finished in dispatch() + self.next_tick_queue.put(node); + self.dispatch(); + } + + pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void { + if (self.next_tick_queue.remove(node)) { + self.finishOneEvent(); + } + } + + pub fn run(self: *Loop) void { + self.finishOneEvent(); // the reference we start with + + self.workerRun(); + + switch (builtin.os) { + .linux, + .macosx, + .freebsd, + .netbsd, + => self.os_data.fs_thread.wait(), + else => {}, + } + + for (self.extra_threads) |extra_thread| { + extra_thread.wait(); + } + } + + /// This is equivalent to function call, except it calls `startCpuBoundOperation` first. + pub fn call(comptime func: var, args: ...) @typeOf(func).ReturnType { + startCpuBoundOperation(); + return func(args); + } + + /// Yielding lets the event loop run, starting any unstarted async operations. + /// Note that async operations automatically start when a function yields for any other reason, + /// for example, when async I/O is performed. This function is intended to be used only when + /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O + /// is performed. + pub fn yield(self: *Loop) void { + suspend { + var my_tick_node = NextTickNode{ + .prev = undefined, + .next = undefined, + .data = @frame(), + }; + self.onNextTick(&my_tick_node); + } + } + + /// If the build is multi-threaded and there is an event loop, then it calls `yield`. Otherwise, + /// does nothing. + pub fn startCpuBoundOperation() void { + if (builtin.single_threaded) { + return; + } else if (instance) |event_loop| { + event_loop.yield(); + } + } + + /// call finishOneEvent when done + pub fn beginOneEvent(self: *Loop) void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + } + + pub fn finishOneEvent(self: *Loop) void { + const prev = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + if (prev == 1) { + // cause all the threads to stop + switch (builtin.os) { + .linux => { + self.posixFsRequest(&self.os_data.fs_end_request); + // writing 8 bytes to an eventfd cannot fail + os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + return; + }, + .macosx, .freebsd, .netbsd => { + self.posixFsRequest(&self.os_data.fs_end_request); + const final_kevent = (*const [1]os.Kevent)(&self.os_data.final_kevent); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + // cannot fail because we already added it and this just enables it + _ = os.kevent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; + return; + }, + .windows => { + var i: usize = 0; + while (i < self.extra_threads.len + 1) : (i += 1) { + while (true) { + const overlapped = &self.final_resume_node.overlapped; + windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue; + break; + } + } + return; + }, + else => @compileError("unsupported OS"), + } + } + } + + fn workerRun(self: *Loop) void { + while (true) { + while (true) { + const next_tick_node = self.next_tick_queue.get() orelse break; + self.dispatch(); + resume next_tick_node.data; + self.finishOneEvent(); + } + + switch (builtin.os) { + .linux => { + // only process 1 event so we don't steal from other threads + var events: [1]os.linux.epoll_event = undefined; + const count = os.epoll_wait(self.os_data.epollfd, events[0..], -1); + for (events[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + .Basic => {}, + .Stop => return, + .EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + event_fd_node.epoll_op = os.EPOLL_CTL_MOD; + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + self.finishOneEvent(); + } + } + }, + .macosx, .freebsd, .netbsd => { + var eventlist: [1]os.Kevent = undefined; + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable; + for (eventlist[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.udata); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + .Basic => { + const basic_node = @fieldParentPtr(ResumeNode.Basic, "base", resume_node); + basic_node.kev = ev; + }, + .Stop => return, + .EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + self.finishOneEvent(); + } + } + }, + .windows => { + var completion_key: usize = undefined; + const overlapped = while (true) { + var nbytes: windows.DWORD = undefined; + var overlapped: ?*windows.OVERLAPPED = undefined; + switch (windows.GetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { + .Aborted => return, + .Normal => {}, + .EOF => {}, + .Cancelled => continue, + } + if (overlapped) |o| break o; + } else unreachable; // TODO else unreachable should not be necessary + const resume_node = @fieldParentPtr(ResumeNode, "overlapped", overlapped); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + .Basic => {}, + .Stop => return, + .EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + self.finishOneEvent(); + }, + else => @compileError("unsupported OS"), + } + } + } + + fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + self.beginOneEvent(); // finished in posixFsRun after processing the msg + self.os_data.fs_queue.put(request_node); + switch (builtin.os) { + .macosx, .freebsd, .netbsd => { + const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wake); + const empty_kevs = ([*]os.Kevent)(undefined)[0..0]; + _ = os.kevent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; + }, + .linux => { + _ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + const rc = os.linux.futex_wake(&self.os_data.fs_queue_item, os.linux.FUTEX_WAKE, 1); + switch (os.linux.getErrno(rc)) { + 0 => {}, + os.EINVAL => unreachable, + else => unreachable, + } + }, + else => @compileError("Unsupported OS"), + } + } + + fn posixFsCancel(self: *Loop, request_node: *fs.RequestNode) void { + if (self.os_data.fs_queue.remove(request_node)) { + self.finishOneEvent(); + } + } + + fn posixFsRun(self: *Loop) void { + while (true) { + if (builtin.os == .linux) { + _ = @atomicRmw(i32, &self.os_data.fs_queue_item, .Xchg, 0, .SeqCst); + } + while (self.os_data.fs_queue.get()) |node| { + switch (node.data.msg) { + .End => return, + .WriteV => |*msg| { + msg.result = os.writev(msg.fd, msg.iov); + }, + .PWriteV => |*msg| { + msg.result = os.pwritev(msg.fd, msg.iov, msg.offset); + }, + .PReadV => |*msg| { + msg.result = os.preadv(msg.fd, msg.iov, msg.offset); + }, + .Open => |*msg| { + msg.result = os.openC(msg.path.ptr, msg.flags, msg.mode); + }, + .Close => |*msg| os.close(msg.fd), + .WriteFile => |*msg| blk: { + const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | + os.O_CLOEXEC | os.O_TRUNC; + const fd = os.openC(msg.path.ptr, flags, msg.mode) catch |err| { + msg.result = err; + break :blk; + }; + defer os.close(fd); + msg.result = os.write(fd, msg.contents); + }, + } + switch (node.data.finish) { + .TickNode => |*tick_node| self.onNextTick(tick_node), + .DeallocCloseOperation => |close_op| { + self.allocator.destroy(close_op); + }, + .NoAction => {}, + } + self.finishOneEvent(); + } + switch (builtin.os) { + .linux => { + const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null); + switch (os.linux.getErrno(rc)) { + 0, os.EINTR, os.EAGAIN => continue, + else => unreachable, + } + }, + .macosx, .freebsd, .netbsd => { + const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wait); + var out_kevs: [1]os.Kevent = undefined; + _ = os.kevent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; + }, + else => @compileError("Unsupported OS"), + } + } + } + + const OsData = switch (builtin.os) { + .linux => LinuxOsData, + .macosx, .freebsd, .netbsd => KEventData, + .windows => struct { + io_port: windows.HANDLE, + extra_thread_count: usize, + }, + else => struct {}, + }; + + const KEventData = struct { + kqfd: i32, + final_kevent: os.Kevent, + fs_kevent_wake: os.Kevent, + fs_kevent_wait: os.Kevent, + fs_thread: *Thread, + fs_kqfd: i32, + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, + }; + + const LinuxOsData = struct { + epollfd: i32, + final_eventfd: i32, + final_eventfd_event: os.linux.epoll_event, + fs_thread: *Thread, + fs_queue_item: i32, + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, + }; +}; + +test "std.event.Loop - basic" { + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + + const allocator = std.heap.direct_allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + loop.run(); +} + +test "std.event.Loop - call" { + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + + const allocator = std.heap.direct_allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var did_it = false; + var handle = async Loop.call(testEventLoop); + var handle2 = async Loop.call(testEventLoop2, &handle, &did_it); + + loop.run(); + + testing.expect(did_it); +} + +async fn testEventLoop() i32 { + return 1234; +} + +async fn testEventLoop2(h: anyframe->i32, did_it: *bool) void { + const value = await h; + testing.expect(value == 1234); + did_it.* = true; +} diff --git a/lib/std/event/net.zig b/lib/std/event/net.zig new file mode 100644 index 0000000000..bed665dcdc --- /dev/null +++ b/lib/std/event/net.zig @@ -0,0 +1,358 @@ +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const testing = std.testing; +const event = std.event; +const mem = std.mem; +const os = std.os; +const Loop = std.event.Loop; +const File = std.fs.File; +const fd_t = os.fd_t; + +pub const Server = struct { + handleRequestFn: async fn (*Server, *const std.net.Address, File) void, + + loop: *Loop, + sockfd: ?i32, + accept_frame: ?anyframe, + listen_address: std.net.Address, + + waiting_for_emfile_node: PromiseNode, + listen_resume_node: event.Loop.ResumeNode, + + const PromiseNode = std.TailQueue(anyframe).Node; + + pub fn init(loop: *Loop) Server { + // TODO can't initialize handler here because we need well defined copy elision + return Server{ + .loop = loop, + .sockfd = null, + .accept_frame = null, + .handleRequestFn = undefined, + .waiting_for_emfile_node = undefined, + .listen_address = undefined, + .listen_resume_node = event.Loop.ResumeNode{ + .id = event.Loop.ResumeNode.Id.Basic, + .handle = undefined, + .overlapped = event.Loop.ResumeNode.overlapped_init, + }, + }; + } + + pub fn listen( + self: *Server, + address: *const std.net.Address, + handleRequestFn: async fn (*Server, *const std.net.Address, File) void, + ) !void { + self.handleRequestFn = handleRequestFn; + + const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); + errdefer os.close(sockfd); + self.sockfd = sockfd; + + try os.bind(sockfd, &address.os_addr); + try os.listen(sockfd, os.SOMAXCONN); + self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd)); + + self.accept_frame = async Server.handler(self); + errdefer await self.accept_frame.?; + + self.listen_resume_node.handle = self.accept_frame.?; + try self.loop.linuxAddFd(sockfd, &self.listen_resume_node, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); + errdefer self.loop.removeFd(sockfd); + } + + /// Stop listening + pub fn close(self: *Server) void { + self.loop.linuxRemoveFd(self.sockfd.?); + if (self.sockfd) |fd| { + os.close(fd); + self.sockfd = null; + } + } + + pub fn deinit(self: *Server) void { + if (self.accept_frame) |accept_frame| await accept_frame; + if (self.sockfd) |sockfd| os.close(sockfd); + } + + pub async fn handler(self: *Server) void { + while (true) { + var accepted_addr: std.net.Address = undefined; + // TODO just inline the following function here and don't expose it as posixAsyncAccept + if (os.accept4_async(self.sockfd.?, &accepted_addr.os_addr, os.SOCK_NONBLOCK | os.SOCK_CLOEXEC)) |accepted_fd| { + if (accepted_fd == -1) { + // would block + suspend; // we will get resumed by epoll_wait in the event loop + continue; + } + var socket = File.openHandle(accepted_fd); + self.handleRequestFn(self, &accepted_addr, socket); + } else |err| switch (err) { + error.ProcessFdQuotaExceeded => @panic("TODO handle this error"), + error.ConnectionAborted => continue, + + error.FileDescriptorNotASocket => unreachable, + error.OperationNotSupported => unreachable, + + error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => { + @panic("TODO handle this error"); + }, + } + } + } +}; + +pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 { + const sockfd = try os.socket( + os.AF_UNIX, + os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, + 0, + ); + errdefer os.close(sockfd); + + var sock_addr = os.sockaddr_un{ + .family = os.AF_UNIX, + .path = undefined, + }; + + if (path.len > @typeOf(sock_addr.path).len) return error.NameTooLong; + mem.copy(u8, sock_addr.path[0..], path); + const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len); + try os.connect_async(sockfd, &sock_addr, size); + try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); + try os.getsockoptError(sockfd); + + return sockfd; +} + +pub const ReadError = error{ + SystemResources, + Unexpected, + UserResourceLimitReached, + InputOutput, + + FileDescriptorNotRegistered, // TODO remove this possibility + OperationCausesCircularLoop, // TODO remove this possibility + FileDescriptorAlreadyPresentInSet, // TODO remove this possibility + FileDescriptorIncompatibleWithEpoll, // TODO remove this possibility +}; + +/// returns number of bytes read. 0 means EOF. +pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize { + const iov = os.iovec{ + .iov_base = buffer.ptr, + .iov_len = buffer.len, + }; + const iovs: *const [1]os.iovec = &iov; + return readvPosix(loop, fd, iovs, 1); +} + +pub const WriteError = error{}; + +pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteError!void { + const iov = os.iovec_const{ + .iov_base = buffer.ptr, + .iov_len = buffer.len, + }; + const iovs: *const [1]os.iovec_const = &iov; + return writevPosix(loop, fd, iovs, 1); +} + +pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void { + while (true) { + switch (builtin.os) { + .macosx, .linux => { + switch (os.errno(os.system.writev(fd, iov, count))) { + 0 => return, + os.EINTR => continue, + os.ESPIPE => unreachable, + os.EINVAL => unreachable, + os.EFAULT => unreachable, + os.EAGAIN => { + try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT); + continue; + }, + os.EBADF => unreachable, // always a race condition + os.EDESTADDRREQ => unreachable, // connect was never called + os.EDQUOT => unreachable, + os.EFBIG => unreachable, + os.EIO => return error.InputOutput, + os.ENOSPC => unreachable, + os.EPERM => return error.AccessDenied, + os.EPIPE => unreachable, + else => |err| return os.unexpectedErrno(err), + } + }, + else => @compileError("Unsupported OS"), + } + } +} + +/// returns number of bytes read. 0 means EOF. +pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count: usize) !usize { + while (true) { + switch (builtin.os) { + builtin.Os.linux, builtin.Os.freebsd, builtin.Os.macosx => { + const rc = os.system.readv(fd, iov, count); + switch (os.errno(rc)) { + 0 => return rc, + os.EINTR => continue, + os.EINVAL => unreachable, + os.EFAULT => unreachable, + os.EAGAIN => { + try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN); + continue; + }, + os.EBADF => unreachable, // always a race condition + os.EIO => return error.InputOutput, + os.EISDIR => unreachable, + os.ENOBUFS => return error.SystemResources, + os.ENOMEM => return error.SystemResources, + else => |err| return os.unexpectedErrno(err), + } + }, + else => @compileError("Unsupported OS"), + } + } +} + +pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void { + const iovecs = try loop.allocator.alloc(os.iovec_const, data.len); + defer loop.allocator.free(iovecs); + + for (data) |buf, i| { + iovecs[i] = os.iovec_const{ + .iov_base = buf.ptr, + .iov_len = buf.len, + }; + } + + return writevPosix(loop, fd, iovecs.ptr, data.len); +} + +pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize { + const iovecs = try loop.allocator.alloc(os.iovec, data.len); + defer loop.allocator.free(iovecs); + + for (data) |buf, i| { + iovecs[i] = os.iovec{ + .iov_base = buf.ptr, + .iov_len = buf.len, + }; + } + + return readvPosix(loop, fd, iovecs.ptr, data.len); +} + +pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File { + var address = _address.*; // TODO https://github.com/ziglang/zig/issues/1592 + + const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp); + errdefer os.close(sockfd); + + try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in)); + try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET); + try os.getsockoptError(sockfd); + + return File.openHandle(sockfd); +} + +test "listen on a port, send bytes, receive bytes" { + // https://github.com/ziglang/zig/issues/2377 + if (true) return error.SkipZigTest; + + if (builtin.os != builtin.Os.linux) { + // TODO build abstractions for other operating systems + return error.SkipZigTest; + } + + const MyServer = struct { + tcp_server: Server, + + const Self = @This(); + async fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void { + const self = @fieldParentPtr(Self, "tcp_server", tcp_server); + var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 + defer socket.close(); + const next_handler = errorableHandler(self, _addr, socket) catch |err| { + std.debug.panic("unable to handle connection: {}\n", err); + }; + } + async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void { + const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/1592 + var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 + + const stream = &socket.outStream().stream; + try stream.print("hello from server\n"); + } + }; + + const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable; + const addr = std.net.Address.initIp4(ip4addr, 0); + + var loop: Loop = undefined; + try loop.initSingleThreaded(std.debug.global_allocator); + var server = MyServer{ .tcp_server = Server.init(&loop) }; + defer server.tcp_server.deinit(); + try server.tcp_server.listen(&addr, MyServer.handler); + + _ = async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server); + loop.run(); +} + +async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void { + errdefer @panic("test failure"); + + var socket_file = try connect(loop, address); + defer socket_file.close(); + + var buf: [512]u8 = undefined; + const amt_read = try socket_file.read(buf[0..]); + const msg = buf[0..amt_read]; + testing.expect(mem.eql(u8, msg, "hello from server\n")); + server.close(); +} + +pub const OutStream = struct { + fd: fd_t, + stream: Stream, + loop: *Loop, + + pub const Error = WriteError; + pub const Stream = event.io.OutStream(Error); + + pub fn init(loop: *Loop, fd: fd_t) OutStream { + return OutStream{ + .fd = fd, + .loop = loop, + .stream = Stream{ .writeFn = writeFn }, + }; + } + + async fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { + const self = @fieldParentPtr(OutStream, "stream", out_stream); + return write(self.loop, self.fd, bytes); + } +}; + +pub const InStream = struct { + fd: fd_t, + stream: Stream, + loop: *Loop, + + pub const Error = ReadError; + pub const Stream = event.io.InStream(Error); + + pub fn init(loop: *Loop, fd: fd_t) InStream { + return InStream{ + .fd = fd, + .loop = loop, + .stream = Stream{ .readFn = readFn }, + }; + } + + async fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { + const self = @fieldParentPtr(InStream, "stream", in_stream); + return read(self.loop, self.fd, bytes); + } +}; diff --git a/lib/std/event/rwlock.zig b/lib/std/event/rwlock.zig new file mode 100644 index 0000000000..bf7ea0fa9f --- /dev/null +++ b/lib/std/event/rwlock.zig @@ -0,0 +1,296 @@ +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const testing = std.testing; +const mem = std.mem; +const Loop = std.event.Loop; + +/// Thread-safe async/await lock. +/// Functions which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +/// Many readers can hold the lock at the same time; however locking for writing is exclusive. +/// When a read lock is held, it will not be released until the reader queue is empty. +/// When a write lock is held, it will not be released until the writer queue is empty. +pub const RwLock = struct { + loop: *Loop, + shared_state: u8, // TODO make this an enum + writer_queue: Queue, + reader_queue: Queue, + writer_queue_empty_bit: u8, // TODO make this a bool + reader_queue_empty_bit: u8, // TODO make this a bool + reader_lock_count: usize, + + const State = struct { + const Unlocked = 0; + const WriteLock = 1; + const ReadLock = 2; + }; + + const Queue = std.atomic.Queue(anyframe); + + pub const HeldRead = struct { + lock: *RwLock, + + pub fn release(self: HeldRead) void { + // If other readers still hold the lock, we're done. + if (@atomicRmw(usize, &self.lock.reader_lock_count, .Sub, 1, .SeqCst) != 1) { + return; + } + + _ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, .Xchg, 1, .SeqCst); + if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) { + // Didn't unlock. Someone else's problem. + return; + } + + self.lock.commonPostUnlock(); + } + }; + + pub const HeldWrite = struct { + lock: *RwLock, + + pub fn release(self: HeldWrite) void { + // See if we can leave it locked for writing, and pass the lock to the next writer + // in the queue to grab the lock. + if (self.lock.writer_queue.get()) |node| { + self.lock.loop.onNextTick(node); + return; + } + + // We need to release the write lock. Check if any readers are waiting to grab the lock. + if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, .SeqCst) == 0) { + // Switch to a read lock. + _ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst); + while (self.lock.reader_queue.get()) |node| { + self.lock.loop.onNextTick(node); + } + return; + } + + _ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, .Xchg, 1, .SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.Unlocked, .SeqCst); + + self.lock.commonPostUnlock(); + } + }; + + pub fn init(loop: *Loop) RwLock { + return RwLock{ + .loop = loop, + .shared_state = State.Unlocked, + .writer_queue = Queue.init(), + .writer_queue_empty_bit = 1, + .reader_queue = Queue.init(), + .reader_queue_empty_bit = 1, + .reader_lock_count = 0, + }; + } + + /// Must be called when not locked. Not thread safe. + /// All calls to acquire() and release() must complete before calling deinit(). + pub fn deinit(self: *RwLock) void { + assert(self.shared_state == State.Unlocked); + while (self.writer_queue.get()) |node| resume node.data; + while (self.reader_queue.get()) |node| resume node.data; + } + + pub async fn acquireRead(self: *RwLock) HeldRead { + _ = @atomicRmw(usize, &self.reader_lock_count, .Add, 1, .SeqCst); + + suspend { + var my_tick_node = Loop.NextTickNode{ + .data = @frame(), + .prev = undefined, + .next = undefined, + }; + + self.reader_queue.put(&my_tick_node); + + // At this point, we are in the reader_queue, so we might have already been resumed. + + // We set this bit so that later we can rely on the fact, that if reader_queue_empty_bit is 1, + // some actor will attempt to grab the lock. + _ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 0, .SeqCst); + + // Here we don't care if we are the one to do the locking or if it was already locked for reading. + const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == State.ReadLock else true; + if (have_read_lock) { + // Give out all the read locks. + if (self.reader_queue.get()) |first_node| { + while (self.reader_queue.get()) |node| { + self.loop.onNextTick(node); + } + resume first_node.data; + } + } + } + return HeldRead{ .lock = self }; + } + + pub async fn acquireWrite(self: *RwLock) HeldWrite { + suspend { + var my_tick_node = Loop.NextTickNode{ + .data = @frame(), + .prev = undefined, + .next = undefined, + }; + + self.writer_queue.put(&my_tick_node); + + // At this point, we are in the writer_queue, so we might have already been resumed. + + // We set this bit so that later we can rely on the fact, that if writer_queue_empty_bit is 1, + // some actor will attempt to grab the lock. + _ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 0, .SeqCst); + + // Here we must be the one to acquire the write lock. It cannot already be locked. + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) == null) { + // We now have a write lock. + if (self.writer_queue.get()) |node| { + // Whether this node is us or someone else, we tail resume it. + resume node.data; + } + } + } + return HeldWrite{ .lock = self }; + } + + fn commonPostUnlock(self: *RwLock) void { + while (true) { + // There might be a writer_queue item or a reader_queue item + // If we check and both are empty, we can be done, because the other actors will try to + // obtain the lock. + // But if there's a writer_queue item or a reader_queue item, + // we are the actor which must loop and attempt to grab the lock again. + if (@atomicLoad(u8, &self.writer_queue_empty_bit, .SeqCst) == 0) { + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) != null) { + // We did not obtain the lock. Great, the queues are someone else's problem. + return; + } + // If there's an item in the writer queue, give them the lock, and we're done. + if (self.writer_queue.get()) |node| { + self.loop.onNextTick(node); + return; + } + // Release the lock again. + _ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 1, .SeqCst); + _ = @atomicRmw(u8, &self.shared_state, .Xchg, State.Unlocked, .SeqCst); + continue; + } + + if (@atomicLoad(u8, &self.reader_queue_empty_bit, .SeqCst) == 0) { + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst) != null) { + // We did not obtain the lock. Great, the queues are someone else's problem. + return; + } + // If there are any items in the reader queue, give out all the reader locks, and we're done. + if (self.reader_queue.get()) |first_node| { + self.loop.onNextTick(first_node); + while (self.reader_queue.get()) |node| { + self.loop.onNextTick(node); + } + return; + } + // Release the lock again. + _ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 1, .SeqCst); + if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) { + // Didn't unlock. Someone else's problem. + return; + } + continue; + } + return; + } + } +}; + +test "std.event.RwLock" { + // https://github.com/ziglang/zig/issues/2377 + if (true) return error.SkipZigTest; + + // https://github.com/ziglang/zig/issues/1908 + if (builtin.single_threaded) return error.SkipZigTest; + + const allocator = std.heap.direct_allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var lock = RwLock.init(&loop); + defer lock.deinit(); + + const handle = testLock(&loop, &lock); + loop.run(); + + const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len; + testing.expectEqualSlices(i32, expected_result, shared_test_data); +} + +async fn testLock(loop: *Loop, lock: *RwLock) void { + var read_nodes: [100]Loop.NextTickNode = undefined; + for (read_nodes) |*read_node| { + const frame = loop.allocator.create(@Frame(readRunner)) catch @panic("memory"); + read_node.data = frame; + frame.* = async readRunner(lock); + loop.onNextTick(read_node); + } + + var write_nodes: [shared_it_count]Loop.NextTickNode = undefined; + for (write_nodes) |*write_node| { + const frame = loop.allocator.create(@Frame(writeRunner)) catch @panic("memory"); + write_node.data = frame; + frame.* = async writeRunner(lock); + loop.onNextTick(write_node); + } + + for (write_nodes) |*write_node| { + const casted = @ptrCast(*const @Frame(writeRunner), write_node.data); + await casted; + loop.allocator.destroy(casted); + } + for (read_nodes) |*read_node| { + const casted = @ptrCast(*const @Frame(readRunner), read_node.data); + await casted; + loop.allocator.destroy(casted); + } +} + +const shared_it_count = 10; +var shared_test_data = [1]i32{0} ** 10; +var shared_test_index: usize = 0; +var shared_count: usize = 0; + +async fn writeRunner(lock: *RwLock) void { + suspend; // resumed by onNextTick + + var i: usize = 0; + while (i < shared_test_data.len) : (i += 1) { + std.time.sleep(100 * std.time.microsecond); + const lock_promise = async lock.acquireWrite(); + const handle = await lock_promise; + defer handle.release(); + + shared_count += 1; + while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) { + shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1; + } + shared_test_index = 0; + } +} + +async fn readRunner(lock: *RwLock) void { + suspend; // resumed by onNextTick + std.time.sleep(1); + + var i: usize = 0; + while (i < shared_test_data.len) : (i += 1) { + const lock_promise = async lock.acquireRead(); + const handle = await lock_promise; + defer handle.release(); + + testing.expect(shared_test_index == 0); + testing.expect(shared_test_data[i] == @intCast(i32, shared_count)); + } +} diff --git a/lib/std/event/rwlocked.zig b/lib/std/event/rwlocked.zig new file mode 100644 index 0000000000..386aa08407 --- /dev/null +++ b/lib/std/event/rwlocked.zig @@ -0,0 +1,58 @@ +const std = @import("../std.zig"); +const RwLock = std.event.RwLock; +const Loop = std.event.Loop; + +/// Thread-safe async/await RW lock that protects one piece of data. +/// Functions which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +pub fn RwLocked(comptime T: type) type { + return struct { + lock: RwLock, + locked_data: T, + + const Self = @This(); + + pub const HeldReadLock = struct { + value: *const T, + held: RwLock.HeldRead, + + pub fn release(self: HeldReadLock) void { + self.held.release(); + } + }; + + pub const HeldWriteLock = struct { + value: *T, + held: RwLock.HeldWrite, + + pub fn release(self: HeldWriteLock) void { + self.held.release(); + } + }; + + pub fn init(loop: *Loop, data: T) Self { + return Self{ + .lock = RwLock.init(loop), + .locked_data = data, + }; + } + + pub fn deinit(self: *Self) void { + self.lock.deinit(); + } + + pub async fn acquireRead(self: *Self) HeldReadLock { + return HeldReadLock{ + .held = await (async self.lock.acquireRead() catch unreachable), + .value = &self.locked_data, + }; + } + + pub async fn acquireWrite(self: *Self) HeldWriteLock { + return HeldWriteLock{ + .held = await (async self.lock.acquireWrite() catch unreachable), + .value = &self.locked_data, + }; + } + }; +} |
