aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2019-09-26 01:54:45 -0400
committerGitHub <noreply@github.com>2019-09-26 01:54:45 -0400
commit68bb3945708c43109c48bda3664176307d45b62c (patch)
treeafb9731e10cef9d192560b52cd9ae2cf179775c4 /lib/std/event
parent6128bc728d1e1024a178c16c2149f5b1a167a013 (diff)
parent4637e8f9699af9c3c6cf4df50ef5bb67c7a318a4 (diff)
downloadzig-68bb3945708c43109c48bda3664176307d45b62c.tar.gz
zig-68bb3945708c43109c48bda3664176307d45b62c.zip
Merge pull request #3315 from ziglang/mv-std-lib
Move std/ to lib/std/
Diffstat (limited to 'lib/std/event')
-rw-r--r--lib/std/event/channel.zig349
-rw-r--r--lib/std/event/fs.zig1431
-rw-r--r--lib/std/event/future.zig121
-rw-r--r--lib/std/event/group.zig131
-rw-r--r--lib/std/event/lock.zig186
-rw-r--r--lib/std/event/locked.zig43
-rw-r--r--lib/std/event/loop.zig923
-rw-r--r--lib/std/event/net.zig358
-rw-r--r--lib/std/event/rwlock.zig296
-rw-r--r--lib/std/event/rwlocked.zig58
10 files changed, 3896 insertions, 0 deletions
diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig
new file mode 100644
index 0000000000..2f211d21e2
--- /dev/null
+++ b/lib/std/event/channel.zig
@@ -0,0 +1,349 @@
+const std = @import("../std.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const testing = std.testing;
+const Loop = std.event.Loop;
+
+/// many producer, many consumer, thread-safe, runtime configurable buffer size
+/// when buffer is empty, consumers suspend and are resumed by producers
+/// when buffer is full, producers suspend and are resumed by consumers
+pub fn Channel(comptime T: type) type {
+ return struct {
+ loop: *Loop,
+
+ getters: std.atomic.Queue(GetNode),
+ or_null_queue: std.atomic.Queue(*std.atomic.Queue(GetNode).Node),
+ putters: std.atomic.Queue(PutNode),
+ get_count: usize,
+ put_count: usize,
+ dispatch_lock: u8, // TODO make this a bool
+ need_dispatch: u8, // TODO make this a bool
+
+ // simple fixed size ring buffer
+ buffer_nodes: []T,
+ buffer_index: usize,
+ buffer_len: usize,
+
+ const SelfChannel = @This();
+ const GetNode = struct {
+ tick_node: *Loop.NextTickNode,
+ data: Data,
+
+ const Data = union(enum) {
+ Normal: Normal,
+ OrNull: OrNull,
+ };
+
+ const Normal = struct {
+ ptr: *T,
+ };
+
+ const OrNull = struct {
+ ptr: *?T,
+ or_null: *std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node,
+ };
+ };
+ const PutNode = struct {
+ data: T,
+ tick_node: *Loop.NextTickNode,
+ };
+
+ /// call destroy when done
+ pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
+ const buffer_nodes = try loop.allocator.alloc(T, capacity);
+ errdefer loop.allocator.free(buffer_nodes);
+
+ const self = try loop.allocator.create(SelfChannel);
+ self.* = SelfChannel{
+ .loop = loop,
+ .buffer_len = 0,
+ .buffer_nodes = buffer_nodes,
+ .buffer_index = 0,
+ .dispatch_lock = 0,
+ .need_dispatch = 0,
+ .getters = std.atomic.Queue(GetNode).init(),
+ .putters = std.atomic.Queue(PutNode).init(),
+ .or_null_queue = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).init(),
+ .get_count = 0,
+ .put_count = 0,
+ };
+ errdefer loop.allocator.destroy(self);
+
+ return self;
+ }
+
+ /// must be called when all calls to put and get have suspended and no more calls occur
+ pub fn destroy(self: *SelfChannel) void {
+ while (self.getters.get()) |get_node| {
+ resume get_node.data.tick_node.data;
+ }
+ while (self.putters.get()) |put_node| {
+ resume put_node.data.tick_node.data;
+ }
+ self.loop.allocator.free(self.buffer_nodes);
+ self.loop.allocator.destroy(self);
+ }
+
+ /// puts a data item in the channel. The function returns when the value has been added to the
+ /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
+ /// Or when the channel is destroyed.
+ pub fn put(self: *SelfChannel, data: T) void {
+ var my_tick_node = Loop.NextTickNode.init(@frame());
+ var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{
+ .tick_node = &my_tick_node,
+ .data = data,
+ });
+
+ // TODO test canceling a put()
+ errdefer {
+ _ = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
+ const need_dispatch = !self.putters.remove(&queue_node);
+ self.loop.cancelOnNextTick(&my_tick_node);
+ if (need_dispatch) {
+ // oops we made the put_count incorrect for a period of time. fix by dispatching.
+ _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
+ self.dispatch();
+ }
+ }
+ suspend {
+ self.putters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
+
+ self.dispatch();
+ }
+ }
+
+ /// await this function to get an item from the channel. If the buffer is empty, the frame will
+ /// complete when the next item is put in the channel.
+ pub async fn get(self: *SelfChannel) T {
+ // TODO https://github.com/ziglang/zig/issues/2765
+ var result: T = undefined;
+ var my_tick_node = Loop.NextTickNode.init(@frame());
+ var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
+ .tick_node = &my_tick_node,
+ .data = GetNode.Data{
+ .Normal = GetNode.Normal{ .ptr = &result },
+ },
+ });
+
+ // TODO test canceling a get()
+ errdefer {
+ _ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
+ const need_dispatch = !self.getters.remove(&queue_node);
+ self.loop.cancelOnNextTick(&my_tick_node);
+ if (need_dispatch) {
+ // oops we made the get_count incorrect for a period of time. fix by dispatching.
+ _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
+ self.dispatch();
+ }
+ }
+
+ suspend {
+ self.getters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
+
+ self.dispatch();
+ }
+ return result;
+ }
+
+ //pub async fn select(comptime EnumUnion: type, channels: ...) EnumUnion {
+ // assert(@memberCount(EnumUnion) == channels.len); // enum union and channels mismatch
+ // assert(channels.len != 0); // enum unions cannot have 0 fields
+ // if (channels.len == 1) {
+ // const result = await (async channels[0].get() catch unreachable);
+ // return @unionInit(EnumUnion, @memberName(EnumUnion, 0), result);
+ // }
+ //}
+
+ /// Await this function to get an item from the channel. If the buffer is empty and there are no
+ /// puts waiting, this returns null.
+ /// Await is necessary for locking purposes. The function will be resumed after checking the channel
+ /// for data and will not wait for data to be available.
+ pub async fn getOrNull(self: *SelfChannel) ?T {
+ // TODO integrate this function with named return values
+ // so we can get rid of this extra result copy
+ var result: ?T = null;
+ var my_tick_node = Loop.NextTickNode.init(@frame());
+ var or_null_node = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node.init(undefined);
+ var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
+ .tick_node = &my_tick_node,
+ .data = GetNode.Data{
+ .OrNull = GetNode.OrNull{
+ .ptr = &result,
+ .or_null = &or_null_node,
+ },
+ },
+ });
+ or_null_node.data = &queue_node;
+
+ // TODO test canceling getOrNull
+ errdefer {
+ _ = self.or_null_queue.remove(&or_null_node);
+ _ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
+ const need_dispatch = !self.getters.remove(&queue_node);
+ self.loop.cancelOnNextTick(&my_tick_node);
+ if (need_dispatch) {
+ // oops we made the get_count incorrect for a period of time. fix by dispatching.
+ _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
+ self.dispatch();
+ }
+ }
+
+ suspend {
+ self.getters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
+ self.or_null_queue.put(&or_null_node);
+
+ self.dispatch();
+ }
+ return result;
+ }
+
+ fn dispatch(self: *SelfChannel) void {
+ // set the "need dispatch" flag
+ _ = @atomicRmw(u8, &self.need_dispatch, .Xchg, 1, .SeqCst);
+
+ lock: while (true) {
+ // set the lock flag
+ const prev_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 1, .SeqCst);
+ if (prev_lock != 0) return;
+
+ // clear the need_dispatch flag since we're about to do it
+ _ = @atomicRmw(u8, &self.need_dispatch, .Xchg, 0, .SeqCst);
+
+ while (true) {
+ one_dispatch: {
+ // later we correct these extra subtractions
+ var get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
+ var put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
+
+ // transfer self.buffer to self.getters
+ while (self.buffer_len != 0) {
+ if (get_count == 0) break :one_dispatch;
+
+ const get_node = &self.getters.get().?.data;
+ switch (get_node.data) {
+ GetNode.Data.Normal => |info| {
+ info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
+ },
+ GetNode.Data.OrNull => |info| {
+ _ = self.or_null_queue.remove(info.or_null);
+ info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
+ },
+ }
+ self.loop.onNextTick(get_node.tick_node);
+ self.buffer_len -= 1;
+
+ get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
+ }
+
+ // direct transfer self.putters to self.getters
+ while (get_count != 0 and put_count != 0) {
+ const get_node = &self.getters.get().?.data;
+ const put_node = &self.putters.get().?.data;
+
+ switch (get_node.data) {
+ GetNode.Data.Normal => |info| {
+ info.ptr.* = put_node.data;
+ },
+ GetNode.Data.OrNull => |info| {
+ _ = self.or_null_queue.remove(info.or_null);
+ info.ptr.* = put_node.data;
+ },
+ }
+ self.loop.onNextTick(get_node.tick_node);
+ self.loop.onNextTick(put_node.tick_node);
+
+ get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
+ put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
+ }
+
+ // transfer self.putters to self.buffer
+ while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
+ const put_node = &self.putters.get().?.data;
+
+ self.buffer_nodes[self.buffer_index] = put_node.data;
+ self.loop.onNextTick(put_node.tick_node);
+ self.buffer_index +%= 1;
+ self.buffer_len += 1;
+
+ put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
+ }
+ }
+
+ // undo the extra subtractions
+ _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
+ _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
+
+ // All the "get or null" functions should resume now.
+ var remove_count: usize = 0;
+ while (self.or_null_queue.get()) |or_null_node| {
+ remove_count += @boolToInt(self.getters.remove(or_null_node.data));
+ self.loop.onNextTick(or_null_node.data.data.tick_node);
+ }
+ if (remove_count != 0) {
+ _ = @atomicRmw(usize, &self.get_count, .Sub, remove_count, .SeqCst);
+ }
+
+ // clear need-dispatch flag
+ const need_dispatch = @atomicRmw(u8, &self.need_dispatch, .Xchg, 0, .SeqCst);
+ if (need_dispatch != 0) continue;
+
+ const my_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 0, .SeqCst);
+ assert(my_lock != 0);
+
+ // we have to check again now that we unlocked
+ if (@atomicLoad(u8, &self.need_dispatch, .SeqCst) != 0) continue :lock;
+
+ return;
+ }
+ }
+ }
+ };
+}
+
+test "std.event.Channel" {
+ // https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+ // https://github.com/ziglang/zig/issues/3251
+ if (std.os.freebsd.is_the_target) return error.SkipZigTest;
+
+ var loop: Loop = undefined;
+ // TODO make a multi threaded test
+ try loop.initSingleThreaded(std.heap.direct_allocator);
+ defer loop.deinit();
+
+ const channel = try Channel(i32).create(&loop, 0);
+ defer channel.destroy();
+
+ const handle = async testChannelGetter(&loop, channel);
+ const putter = async testChannelPutter(channel);
+
+ loop.run();
+}
+
+async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
+ const value1 = channel.get();
+ testing.expect(value1 == 1234);
+
+ const value2 = channel.get();
+ testing.expect(value2 == 4567);
+
+ const value3 = channel.getOrNull();
+ testing.expect(value3 == null);
+
+ var last_put = async testPut(channel, 4444);
+ const value4 = channel.getOrNull();
+ testing.expect(value4.? == 4444);
+ await last_put;
+}
+
+async fn testChannelPutter(channel: *Channel(i32)) void {
+ channel.put(1234);
+ channel.put(4567);
+}
+
+async fn testPut(channel: *Channel(i32), value: i32) void {
+ channel.put(value);
+}
diff --git a/lib/std/event/fs.zig b/lib/std/event/fs.zig
new file mode 100644
index 0000000000..4490e1deae
--- /dev/null
+++ b/lib/std/event/fs.zig
@@ -0,0 +1,1431 @@
+const builtin = @import("builtin");
+const std = @import("../std.zig");
+const event = std.event;
+const assert = std.debug.assert;
+const testing = std.testing;
+const os = std.os;
+const mem = std.mem;
+const windows = os.windows;
+const Loop = event.Loop;
+const fd_t = os.fd_t;
+const File = std.fs.File;
+
+pub const RequestNode = std.atomic.Queue(Request).Node;
+
+pub const Request = struct {
+ msg: Msg,
+ finish: Finish,
+
+ pub const Finish = union(enum) {
+ TickNode: Loop.NextTickNode,
+ DeallocCloseOperation: *CloseOperation,
+ NoAction,
+ };
+
+ pub const Msg = union(enum) {
+ WriteV: WriteV,
+ PWriteV: PWriteV,
+ PReadV: PReadV,
+ Open: Open,
+ Close: Close,
+ WriteFile: WriteFile,
+ End, // special - means the fs thread should exit
+
+ pub const WriteV = struct {
+ fd: fd_t,
+ iov: []const os.iovec_const,
+ result: Error!void,
+
+ pub const Error = os.WriteError;
+ };
+
+ pub const PWriteV = struct {
+ fd: fd_t,
+ iov: []const os.iovec_const,
+ offset: usize,
+ result: Error!void,
+
+ pub const Error = os.WriteError;
+ };
+
+ pub const PReadV = struct {
+ fd: fd_t,
+ iov: []const os.iovec,
+ offset: usize,
+ result: Error!usize,
+
+ pub const Error = os.ReadError;
+ };
+
+ pub const Open = struct {
+ /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265
+ path: []const u8,
+ flags: u32,
+ mode: File.Mode,
+ result: Error!fd_t,
+
+ pub const Error = File.OpenError;
+ };
+
+ pub const WriteFile = struct {
+ /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265
+ path: []const u8,
+ contents: []const u8,
+ mode: File.Mode,
+ result: Error!void,
+
+ pub const Error = File.OpenError || File.WriteError;
+ };
+
+ pub const Close = struct {
+ fd: fd_t,
+ };
+ };
+};
+
+pub const PWriteVError = error{OutOfMemory} || File.WriteError;
+
+/// data - just the inner references - must live until pwritev frame completes.
+pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void {
+ switch (builtin.os) {
+ .macosx,
+ .linux,
+ .freebsd,
+ .netbsd,
+ => {
+ const iovecs = try loop.allocator.alloc(os.iovec_const, data.len);
+ defer loop.allocator.free(iovecs);
+
+ for (data) |buf, i| {
+ iovecs[i] = os.iovec_const{
+ .iov_base = buf.ptr,
+ .iov_len = buf.len,
+ };
+ }
+
+ return pwritevPosix(loop, fd, iovecs, offset);
+ },
+ .windows => {
+ const data_copy = try std.mem.dupe(loop.allocator, []const u8, data);
+ defer loop.allocator.free(data_copy);
+ return pwritevWindows(loop, fd, data, offset);
+ },
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// data must outlive the returned frame
+pub fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void {
+ if (data.len == 0) return;
+ if (data.len == 1) return pwriteWindows(loop, fd, data[0], offset);
+
+ // TODO do these in parallel
+ var off = offset;
+ for (data) |buf| {
+ try pwriteWindows(loop, fd, buf, off);
+ off += buf.len;
+ }
+}
+
+pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void {
+ var resume_node = Loop.ResumeNode.Basic{
+ .base = Loop.ResumeNode{
+ .id = Loop.ResumeNode.Id.Basic,
+ .handle = @frame(),
+ .overlapped = windows.OVERLAPPED{
+ .Internal = 0,
+ .InternalHigh = 0,
+ .Offset = @truncate(u32, offset),
+ .OffsetHigh = @truncate(u32, offset >> 32),
+ .hEvent = null,
+ },
+ },
+ };
+ // TODO only call create io completion port once per fd
+ _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined);
+ loop.beginOneEvent();
+ errdefer loop.finishOneEvent();
+
+ errdefer {
+ _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped);
+ }
+ suspend {
+ _ = windows.kernel32.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped);
+ }
+ var bytes_transferred: windows.DWORD = undefined;
+ if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
+ switch (windows.kernel32.GetLastError()) {
+ windows.ERROR.IO_PENDING => unreachable,
+ windows.ERROR.INVALID_USER_BUFFER => return error.SystemResources,
+ windows.ERROR.NOT_ENOUGH_MEMORY => return error.SystemResources,
+ windows.ERROR.OPERATION_ABORTED => return error.OperationAborted,
+ windows.ERROR.NOT_ENOUGH_QUOTA => return error.SystemResources,
+ windows.ERROR.BROKEN_PIPE => return error.BrokenPipe,
+ else => |err| return windows.unexpectedError(err),
+ }
+ }
+}
+
+/// iovecs must live until pwritev frame completes.
+pub fn pwritevPosix(
+ loop: *Loop,
+ fd: fd_t,
+ iovecs: []const os.iovec_const,
+ offset: usize,
+) os.WriteError!void {
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .PWriteV = Request.Msg.PWriteV{
+ .fd = fd,
+ .iov = iovecs,
+ .offset = offset,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ errdefer loop.posixFsCancel(&req_node);
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.PWriteV.result;
+}
+
+/// iovecs must live until pwritev frame completes.
+pub fn writevPosix(
+ loop: *Loop,
+ fd: fd_t,
+ iovecs: []const os.iovec_const,
+) os.WriteError!void {
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .WriteV = Request.Msg.WriteV{
+ .fd = fd,
+ .iov = iovecs,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.WriteV.result;
+}
+
+pub const PReadVError = error{OutOfMemory} || File.ReadError;
+
+/// data - just the inner references - must live until preadv frame completes.
+pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize {
+ assert(data.len != 0);
+ switch (builtin.os) {
+ .macosx,
+ .linux,
+ .freebsd,
+ .netbsd,
+ => {
+ const iovecs = try loop.allocator.alloc(os.iovec, data.len);
+ defer loop.allocator.free(iovecs);
+
+ for (data) |buf, i| {
+ iovecs[i] = os.iovec{
+ .iov_base = buf.ptr,
+ .iov_len = buf.len,
+ };
+ }
+
+ return preadvPosix(loop, fd, iovecs, offset);
+ },
+ .windows => {
+ const data_copy = try std.mem.dupe(loop.allocator, []u8, data);
+ defer loop.allocator.free(data_copy);
+ return preadvWindows(loop, fd, data_copy, offset);
+ },
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// data must outlive the returned frame
+pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize {
+ assert(data.len != 0);
+ if (data.len == 1) return preadWindows(loop, fd, data[0], offset);
+
+ // TODO do these in parallel?
+ var off: usize = 0;
+ var iov_i: usize = 0;
+ var inner_off: usize = 0;
+ while (true) {
+ const v = data[iov_i];
+ const amt_read = try preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off);
+ off += amt_read;
+ inner_off += amt_read;
+ if (inner_off == v.len) {
+ iov_i += 1;
+ inner_off = 0;
+ if (iov_i == data.len) {
+ return off;
+ }
+ }
+ if (amt_read == 0) return off; // EOF
+ }
+}
+
+pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize {
+ var resume_node = Loop.ResumeNode.Basic{
+ .base = Loop.ResumeNode{
+ .id = Loop.ResumeNode.Id.Basic,
+ .handle = @frame(),
+ .overlapped = windows.OVERLAPPED{
+ .Internal = 0,
+ .InternalHigh = 0,
+ .Offset = @truncate(u32, offset),
+ .OffsetHigh = @truncate(u32, offset >> 32),
+ .hEvent = null,
+ },
+ },
+ };
+ // TODO only call create io completion port once per fd
+ _ = windows.CreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined) catch undefined;
+ loop.beginOneEvent();
+ errdefer loop.finishOneEvent();
+
+ errdefer {
+ _ = windows.kernel32.CancelIoEx(fd, &resume_node.base.overlapped);
+ }
+ suspend {
+ _ = windows.kernel32.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped);
+ }
+ var bytes_transferred: windows.DWORD = undefined;
+ if (windows.kernel32.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
+ switch (windows.kernel32.GetLastError()) {
+ windows.ERROR.IO_PENDING => unreachable,
+ windows.ERROR.OPERATION_ABORTED => return error.OperationAborted,
+ windows.ERROR.BROKEN_PIPE => return error.BrokenPipe,
+ windows.ERROR.HANDLE_EOF => return usize(bytes_transferred),
+ else => |err| return windows.unexpectedError(err),
+ }
+ }
+ return usize(bytes_transferred);
+}
+
+/// iovecs must live until preadv frame completes
+pub fn preadvPosix(
+ loop: *Loop,
+ fd: fd_t,
+ iovecs: []const os.iovec,
+ offset: usize,
+) os.ReadError!usize {
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .PReadV = Request.Msg.PReadV{
+ .fd = fd,
+ .iov = iovecs,
+ .offset = offset,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ errdefer loop.posixFsCancel(&req_node);
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.PReadV.result;
+}
+
+pub fn openPosix(
+ loop: *Loop,
+ path: []const u8,
+ flags: u32,
+ mode: File.Mode,
+) File.OpenError!fd_t {
+ const path_c = try std.os.toPosixPath(path);
+
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .Open = Request.Msg.Open{
+ .path = path_c[0..path.len],
+ .flags = flags,
+ .mode = mode,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ errdefer loop.posixFsCancel(&req_node);
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.Open.result;
+}
+
+pub fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t {
+ switch (builtin.os) {
+ .macosx, .linux, .freebsd, .netbsd => {
+ const flags = os.O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC;
+ return openPosix(loop, path, flags, File.default_mode);
+ },
+
+ .windows => return windows.CreateFile(
+ path,
+ windows.GENERIC_READ,
+ windows.FILE_SHARE_READ,
+ null,
+ windows.OPEN_EXISTING,
+ windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ ),
+
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// Creates if does not exist. Truncates the file if it exists.
+/// Uses the default mode.
+pub fn openWrite(loop: *Loop, path: []const u8) File.OpenError!fd_t {
+ return openWriteMode(loop, path, File.default_mode);
+}
+
+/// Creates if does not exist. Truncates the file if it exists.
+pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenError!fd_t {
+ switch (builtin.os) {
+ .macosx,
+ .linux,
+ .freebsd,
+ .netbsd,
+ => {
+ const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC;
+ return openPosix(loop, path, flags, File.default_mode);
+ },
+ .windows => return windows.CreateFile(
+ path,
+ windows.GENERIC_WRITE,
+ windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE,
+ null,
+ windows.CREATE_ALWAYS,
+ windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ ),
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// Creates if does not exist. Does not truncate.
+pub fn openReadWrite(
+ loop: *Loop,
+ path: []const u8,
+ mode: File.Mode,
+) File.OpenError!fd_t {
+ switch (builtin.os) {
+ .macosx, .linux, .freebsd, .netbsd => {
+ const flags = os.O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC;
+ return openPosix(loop, path, flags, mode);
+ },
+
+ .windows => return windows.CreateFile(
+ path,
+ windows.GENERIC_WRITE | windows.GENERIC_READ,
+ windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE,
+ null,
+ windows.OPEN_ALWAYS,
+ windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ ),
+
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+/// This abstraction helps to close file handles in defer expressions
+/// without the possibility of failure and without the use of suspend points.
+/// Start a `CloseOperation` before opening a file, so that you can defer
+/// `CloseOperation.finish`.
+/// If you call `setHandle` then finishing will close the fd; otherwise finishing
+/// will deallocate the `CloseOperation`.
+pub const CloseOperation = struct {
+ loop: *Loop,
+ os_data: OsData,
+
+ const OsData = switch (builtin.os) {
+ .linux, .macosx, .freebsd, .netbsd => OsDataPosix,
+
+ .windows => struct {
+ handle: ?fd_t,
+ },
+
+ else => @compileError("Unsupported OS"),
+ };
+
+ const OsDataPosix = struct {
+ have_fd: bool,
+ close_req_node: RequestNode,
+ };
+
+ pub fn start(loop: *Loop) (error{OutOfMemory}!*CloseOperation) {
+ const self = try loop.allocator.create(CloseOperation);
+ self.* = CloseOperation{
+ .loop = loop,
+ .os_data = switch (builtin.os) {
+ .linux, .macosx, .freebsd, .netbsd => initOsDataPosix(self),
+ .windows => OsData{ .handle = null },
+ else => @compileError("Unsupported OS"),
+ },
+ };
+ return self;
+ }
+
+ fn initOsDataPosix(self: *CloseOperation) OsData {
+ return OsData{
+ .have_fd = false,
+ .close_req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .Close = Request.Msg.Close{ .fd = undefined },
+ },
+ .finish = Request.Finish{ .DeallocCloseOperation = self },
+ },
+ },
+ };
+ }
+
+ /// Defer this after creating.
+ pub fn finish(self: *CloseOperation) void {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => {
+ if (self.os_data.have_fd) {
+ self.loop.posixFsRequest(&self.os_data.close_req_node);
+ } else {
+ self.loop.allocator.destroy(self);
+ }
+ },
+ .windows => {
+ if (self.os_data.handle) |handle| {
+ os.close(handle);
+ }
+ self.loop.allocator.destroy(self);
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ pub fn setHandle(self: *CloseOperation, handle: fd_t) void {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => {
+ self.os_data.close_req_node.data.msg.Close.fd = handle;
+ self.os_data.have_fd = true;
+ },
+ .windows => {
+ self.os_data.handle = handle;
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ /// Undo a `setHandle`.
+ pub fn clearHandle(self: *CloseOperation) void {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => {
+ self.os_data.have_fd = false;
+ },
+ .windows => {
+ self.os_data.handle = null;
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ pub fn getHandle(self: *CloseOperation) fd_t {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => {
+ assert(self.os_data.have_fd);
+ return self.os_data.close_req_node.data.msg.Close.fd;
+ },
+ .windows => {
+ return self.os_data.handle.?;
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+};
+
+/// contents must remain alive until writeFile completes.
+/// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate
+pub fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void {
+ return writeFileMode(loop, path, contents, File.default_mode);
+}
+
+/// contents must remain alive until writeFile completes.
+pub fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => return writeFileModeThread(loop, path, contents, mode),
+ .windows => return writeFileWindows(loop, path, contents),
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void {
+ const handle = try windows.CreateFile(
+ path,
+ windows.GENERIC_WRITE,
+ windows.FILE_SHARE_WRITE | windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE,
+ null,
+ windows.CREATE_ALWAYS,
+ windows.FILE_ATTRIBUTE_NORMAL | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ );
+ defer os.close(handle);
+
+ try pwriteWindows(loop, handle, contents, 0);
+}
+
+fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
+ const path_with_null = try std.cstr.addNullByte(loop.allocator, path);
+ defer loop.allocator.free(path_with_null);
+
+ var req_node = RequestNode{
+ .prev = null,
+ .next = null,
+ .data = Request{
+ .msg = Request.Msg{
+ .WriteFile = Request.Msg.WriteFile{
+ .path = path_with_null[0..path.len],
+ .contents = contents,
+ .mode = mode,
+ .result = undefined,
+ },
+ },
+ .finish = Request.Finish{
+ .TickNode = Loop.NextTickNode{
+ .prev = null,
+ .next = null,
+ .data = @frame(),
+ },
+ },
+ },
+ };
+
+ errdefer loop.posixFsCancel(&req_node);
+
+ suspend {
+ loop.posixFsRequest(&req_node);
+ }
+
+ return req_node.data.msg.WriteFile.result;
+}
+
+/// The frame resumes when the last data has been confirmed written, but before the file handle
+/// is closed.
+/// Caller owns returned memory.
+pub fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 {
+ var close_op = try CloseOperation.start(loop);
+ defer close_op.finish();
+
+ const fd = try openRead(loop, file_path);
+ close_op.setHandle(fd);
+
+ var list = std.ArrayList(u8).init(loop.allocator);
+ defer list.deinit();
+
+ while (true) {
+ try list.ensureCapacity(list.len + mem.page_size);
+ const buf = list.items[list.len..];
+ const buf_array = [_][]u8{buf};
+ const amt = try preadv(loop, fd, buf_array, list.len);
+ list.len += amt;
+ if (list.len > max_size) {
+ return error.FileTooBig;
+ }
+ if (amt < buf.len) {
+ return list.toOwnedSlice();
+ }
+ }
+}
+
+pub const WatchEventId = enum {
+ CloseWrite,
+ Delete,
+};
+
+fn eqlString(a: []const u16, b: []const u16) bool {
+ if (a.len != b.len) return false;
+ if (a.ptr == b.ptr) return true;
+ return mem.compare(u16, a, b) == .Equal;
+}
+
+fn hashString(s: []const u16) u32 {
+ return @truncate(u32, std.hash.Wyhash.hash(0, @sliceToBytes(s)));
+}
+
+//pub const WatchEventError = error{
+// UserResourceLimitReached,
+// SystemResources,
+// AccessDenied,
+// Unexpected, // TODO remove this possibility
+//};
+//
+//pub fn Watch(comptime V: type) type {
+// return struct {
+// channel: *event.Channel(Event.Error!Event),
+// os_data: OsData,
+//
+// const OsData = switch (builtin.os) {
+// .macosx, .freebsd, .netbsd => struct {
+// file_table: FileTable,
+// table_lock: event.Lock,
+//
+// const FileTable = std.StringHashmap(*Put);
+// const Put = struct {
+// putter: anyframe,
+// value_ptr: *V,
+// };
+// },
+//
+// .linux => LinuxOsData,
+// .windows => WindowsOsData,
+//
+// else => @compileError("Unsupported OS"),
+// };
+//
+// const WindowsOsData = struct {
+// table_lock: event.Lock,
+// dir_table: DirTable,
+// all_putters: std.atomic.Queue(anyframe),
+// ref_count: std.atomic.Int(usize),
+//
+// const DirTable = std.StringHashMap(*Dir);
+// const FileTable = std.HashMap([]const u16, V, hashString, eqlString);
+//
+// const Dir = struct {
+// putter: anyframe,
+// file_table: FileTable,
+// table_lock: event.Lock,
+// };
+// };
+//
+// const LinuxOsData = struct {
+// putter: anyframe,
+// inotify_fd: i32,
+// wd_table: WdTable,
+// table_lock: event.Lock,
+//
+// const WdTable = std.AutoHashMap(i32, Dir);
+// const FileTable = std.StringHashMap(V);
+//
+// const Dir = struct {
+// dirname: []const u8,
+// file_table: FileTable,
+// };
+// };
+//
+// const FileToHandle = std.StringHashMap(anyframe);
+//
+// const Self = @This();
+//
+// pub const Event = struct {
+// id: Id,
+// data: V,
+//
+// pub const Id = WatchEventId;
+// pub const Error = WatchEventError;
+// };
+//
+// pub fn create(loop: *Loop, event_buf_count: usize) !*Self {
+// const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count);
+// errdefer channel.destroy();
+//
+// switch (builtin.os) {
+// .linux => {
+// const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC);
+// errdefer os.close(inotify_fd);
+//
+// var result: *Self = undefined;
+// _ = try async<loop.allocator> linuxEventPutter(inotify_fd, channel, &result);
+// return result;
+// },
+//
+// .windows => {
+// const self = try loop.allocator.create(Self);
+// errdefer loop.allocator.destroy(self);
+// self.* = Self{
+// .channel = channel,
+// .os_data = OsData{
+// .table_lock = event.Lock.init(loop),
+// .dir_table = OsData.DirTable.init(loop.allocator),
+// .ref_count = std.atomic.Int(usize).init(1),
+// .all_putters = std.atomic.Queue(anyframe).init(),
+// },
+// };
+// return self;
+// },
+//
+// .macosx, .freebsd, .netbsd => {
+// const self = try loop.allocator.create(Self);
+// errdefer loop.allocator.destroy(self);
+//
+// self.* = Self{
+// .channel = channel,
+// .os_data = OsData{
+// .table_lock = event.Lock.init(loop),
+// .file_table = OsData.FileTable.init(loop.allocator),
+// },
+// };
+// return self;
+// },
+// else => @compileError("Unsupported OS"),
+// }
+// }
+//
+// /// All addFile calls and removeFile calls must have completed.
+// pub fn destroy(self: *Self) void {
+// switch (builtin.os) {
+// .macosx, .freebsd, .netbsd => {
+// // TODO we need to cancel the frames before destroying the lock
+// self.os_data.table_lock.deinit();
+// var it = self.os_data.file_table.iterator();
+// while (it.next()) |entry| {
+// cancel entry.value.putter;
+// self.channel.loop.allocator.free(entry.key);
+// }
+// self.channel.destroy();
+// },
+// .linux => cancel self.os_data.putter,
+// .windows => {
+// while (self.os_data.all_putters.get()) |putter_node| {
+// cancel putter_node.data;
+// }
+// self.deref();
+// },
+// else => @compileError("Unsupported OS"),
+// }
+// }
+//
+// fn ref(self: *Self) void {
+// _ = self.os_data.ref_count.incr();
+// }
+//
+// fn deref(self: *Self) void {
+// if (self.os_data.ref_count.decr() == 1) {
+// const allocator = self.channel.loop.allocator;
+// self.os_data.table_lock.deinit();
+// var it = self.os_data.dir_table.iterator();
+// while (it.next()) |entry| {
+// allocator.free(entry.key);
+// allocator.destroy(entry.value);
+// }
+// self.os_data.dir_table.deinit();
+// self.channel.destroy();
+// allocator.destroy(self);
+// }
+// }
+//
+// pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V {
+// switch (builtin.os) {
+// .macosx, .freebsd, .netbsd => return await (async addFileKEvent(self, file_path, value) catch unreachable),
+// .linux => return await (async addFileLinux(self, file_path, value) catch unreachable),
+// .windows => return await (async addFileWindows(self, file_path, value) catch unreachable),
+// else => @compileError("Unsupported OS"),
+// }
+// }
+//
+// async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V {
+// const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path});
+// var resolved_path_consumed = false;
+// defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path);
+//
+// var close_op = try CloseOperation.start(self.channel.loop);
+// var close_op_consumed = false;
+// defer if (!close_op_consumed) close_op.finish();
+//
+// const flags = if (os.darwin.is_the_target) os.O_SYMLINK | os.O_EVTONLY else 0;
+// const mode = 0;
+// const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable);
+// close_op.setHandle(fd);
+//
+// var put_data: *OsData.Put = undefined;
+// const putter = try async self.kqPutEvents(close_op, value, &put_data);
+// close_op_consumed = true;
+// errdefer cancel putter;
+//
+// const result = blk: {
+// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// const gop = try self.os_data.file_table.getOrPut(resolved_path);
+// if (gop.found_existing) {
+// const prev_value = gop.kv.value.value_ptr.*;
+// cancel gop.kv.value.putter;
+// gop.kv.value = put_data;
+// break :blk prev_value;
+// } else {
+// resolved_path_consumed = true;
+// gop.kv.value = put_data;
+// break :blk null;
+// }
+// };
+//
+// return result;
+// }
+//
+// async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void {
+// var value_copy = value;
+// var put = OsData.Put{
+// .putter = @frame(),
+// .value_ptr = &value_copy,
+// };
+// out_put.* = &put;
+// self.channel.loop.beginOneEvent();
+//
+// defer {
+// close_op.finish();
+// self.channel.loop.finishOneEvent();
+// }
+//
+// while (true) {
+// if (await (async self.channel.loop.bsdWaitKev(
+// @intCast(usize, close_op.getHandle()),
+// os.EVFILT_VNODE,
+// os.NOTE_WRITE | os.NOTE_DELETE,
+// ) catch unreachable)) |kev| {
+// // TODO handle EV_ERROR
+// if (kev.fflags & os.NOTE_DELETE != 0) {
+// await (async self.channel.put(Self.Event{
+// .id = Event.Id.Delete,
+// .data = value_copy,
+// }) catch unreachable);
+// } else if (kev.fflags & os.NOTE_WRITE != 0) {
+// await (async self.channel.put(Self.Event{
+// .id = Event.Id.CloseWrite,
+// .data = value_copy,
+// }) catch unreachable);
+// }
+// } else |err| switch (err) {
+// error.EventNotFound => unreachable,
+// error.ProcessNotFound => unreachable,
+// error.Overflow => unreachable,
+// error.AccessDenied, error.SystemResources => |casted_err| {
+// await (async self.channel.put(casted_err) catch unreachable);
+// },
+// }
+// }
+// }
+//
+// async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
+// const value_copy = value;
+//
+// const dirname = std.fs.path.dirname(file_path) orelse ".";
+// const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname);
+// var dirname_with_null_consumed = false;
+// defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null);
+//
+// const basename = std.fs.path.basename(file_path);
+// const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename);
+// var basename_with_null_consumed = false;
+// defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null);
+//
+// const wd = try os.inotify_add_watchC(
+// self.os_data.inotify_fd,
+// dirname_with_null.ptr,
+// os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK,
+// );
+// // wd is either a newly created watch or an existing one.
+//
+// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// const gop = try self.os_data.wd_table.getOrPut(wd);
+// if (!gop.found_existing) {
+// gop.kv.value = OsData.Dir{
+// .dirname = dirname_with_null,
+// .file_table = OsData.FileTable.init(self.channel.loop.allocator),
+// };
+// dirname_with_null_consumed = true;
+// }
+// const dir = &gop.kv.value;
+//
+// const file_table_gop = try dir.file_table.getOrPut(basename_with_null);
+// if (file_table_gop.found_existing) {
+// const prev_value = file_table_gop.kv.value;
+// file_table_gop.kv.value = value_copy;
+// return prev_value;
+// } else {
+// file_table_gop.kv.value = value_copy;
+// basename_with_null_consumed = true;
+// return null;
+// }
+// }
+//
+// async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V {
+// const value_copy = value;
+// // TODO we might need to convert dirname and basename to canonical file paths ("short"?)
+//
+// const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse ".");
+// var dirname_consumed = false;
+// defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname);
+//
+// const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname);
+// defer self.channel.loop.allocator.free(dirname_utf16le);
+//
+// // TODO https://github.com/ziglang/zig/issues/265
+// const basename = std.fs.path.basename(file_path);
+// const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename);
+// var basename_utf16le_null_consumed = false;
+// defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null);
+// const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1];
+//
+// const dir_handle = try windows.CreateFileW(
+// dirname_utf16le.ptr,
+// windows.FILE_LIST_DIRECTORY,
+// windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE,
+// null,
+// windows.OPEN_EXISTING,
+// windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED,
+// null,
+// );
+// var dir_handle_consumed = false;
+// defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle);
+//
+// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// const gop = try self.os_data.dir_table.getOrPut(dirname);
+// if (gop.found_existing) {
+// const dir = gop.kv.value;
+// const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable);
+// defer held_dir_lock.release();
+//
+// const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null);
+// if (file_gop.found_existing) {
+// const prev_value = file_gop.kv.value;
+// file_gop.kv.value = value_copy;
+// return prev_value;
+// } else {
+// file_gop.kv.value = value_copy;
+// basename_utf16le_null_consumed = true;
+// return null;
+// }
+// } else {
+// errdefer _ = self.os_data.dir_table.remove(dirname);
+// const dir = try self.channel.loop.allocator.create(OsData.Dir);
+// errdefer self.channel.loop.allocator.destroy(dir);
+//
+// dir.* = OsData.Dir{
+// .file_table = OsData.FileTable.init(self.channel.loop.allocator),
+// .table_lock = event.Lock.init(self.channel.loop),
+// .putter = undefined,
+// };
+// gop.kv.value = dir;
+// assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null);
+// basename_utf16le_null_consumed = true;
+//
+// dir.putter = try async self.windowsDirReader(dir_handle, dir);
+// dir_handle_consumed = true;
+//
+// dirname_consumed = true;
+//
+// return null;
+// }
+// }
+//
+// async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void {
+// self.ref();
+// defer self.deref();
+//
+// defer os.close(dir_handle);
+//
+// var putter_node = std.atomic.Queue(anyframe).Node{
+// .data = @frame(),
+// .prev = null,
+// .next = null,
+// };
+// self.os_data.all_putters.put(&putter_node);
+// defer _ = self.os_data.all_putters.remove(&putter_node);
+//
+// var resume_node = Loop.ResumeNode.Basic{
+// .base = Loop.ResumeNode{
+// .id = Loop.ResumeNode.Id.Basic,
+// .handle = @frame(),
+// .overlapped = windows.OVERLAPPED{
+// .Internal = 0,
+// .InternalHigh = 0,
+// .Offset = 0,
+// .OffsetHigh = 0,
+// .hEvent = null,
+// },
+// },
+// };
+// var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined;
+//
+// // TODO handle this error not in the channel but in the setup
+// _ = windows.CreateIoCompletionPort(
+// dir_handle,
+// self.channel.loop.os_data.io_port,
+// undefined,
+// undefined,
+// ) catch |err| {
+// await (async self.channel.put(err) catch unreachable);
+// return;
+// };
+//
+// while (true) {
+// {
+// // TODO only 1 beginOneEvent for the whole function
+// self.channel.loop.beginOneEvent();
+// errdefer self.channel.loop.finishOneEvent();
+// errdefer {
+// _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped);
+// }
+// suspend {
+// _ = windows.kernel32.ReadDirectoryChangesW(
+// dir_handle,
+// &event_buf,
+// @intCast(windows.DWORD, event_buf.len),
+// windows.FALSE, // watch subtree
+// windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME |
+// windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE |
+// windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
+// windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
+// null, // number of bytes transferred (unused for async)
+// &resume_node.base.overlapped,
+// null, // completion routine - unused because we use IOCP
+// );
+// }
+// }
+// var bytes_transferred: windows.DWORD = undefined;
+// if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
+// const err = switch (windows.kernel32.GetLastError()) {
+// else => |err| windows.unexpectedError(err),
+// };
+// await (async self.channel.put(err) catch unreachable);
+// } else {
+// // can't use @bytesToSlice because of the special variable length name field
+// var ptr = event_buf[0..].ptr;
+// const end_ptr = ptr + bytes_transferred;
+// var ev: *windows.FILE_NOTIFY_INFORMATION = undefined;
+// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) {
+// ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr);
+// const emit = switch (ev.Action) {
+// windows.FILE_ACTION_REMOVED => WatchEventId.Delete,
+// windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite,
+// else => null,
+// };
+// if (emit) |id| {
+// const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2];
+// const user_value = blk: {
+// const held = await (async dir.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// if (dir.file_table.get(basename_utf16le)) |entry| {
+// break :blk entry.value;
+// } else {
+// break :blk null;
+// }
+// };
+// if (user_value) |v| {
+// await (async self.channel.put(Event{
+// .id = id,
+// .data = v,
+// }) catch unreachable);
+// }
+// }
+// if (ev.NextEntryOffset == 0) break;
+// }
+// }
+// }
+// }
+//
+// pub async fn removeFile(self: *Self, file_path: []const u8) ?V {
+// @panic("TODO");
+// }
+//
+// async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void {
+// const loop = channel.loop;
+//
+// var watch = Self{
+// .channel = channel,
+// .os_data = OsData{
+// .putter = @frame(),
+// .inotify_fd = inotify_fd,
+// .wd_table = OsData.WdTable.init(loop.allocator),
+// .table_lock = event.Lock.init(loop),
+// },
+// };
+// out_watch.* = &watch;
+//
+// loop.beginOneEvent();
+//
+// defer {
+// watch.os_data.table_lock.deinit();
+// var wd_it = watch.os_data.wd_table.iterator();
+// while (wd_it.next()) |wd_entry| {
+// var file_it = wd_entry.value.file_table.iterator();
+// while (file_it.next()) |file_entry| {
+// loop.allocator.free(file_entry.key);
+// }
+// loop.allocator.free(wd_entry.value.dirname);
+// }
+// loop.finishOneEvent();
+// os.close(inotify_fd);
+// channel.destroy();
+// }
+//
+// var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
+//
+// while (true) {
+// const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len);
+// const errno = os.linux.getErrno(rc);
+// switch (errno) {
+// 0 => {
+// // can't use @bytesToSlice because of the special variable length name field
+// var ptr = event_buf[0..].ptr;
+// const end_ptr = ptr + event_buf.len;
+// var ev: *os.linux.inotify_event = undefined;
+// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) {
+// ev = @ptrCast(*os.linux.inotify_event, ptr);
+// if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
+// const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
+// const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1];
+// const user_value = blk: {
+// const held = await (async watch.os_data.table_lock.acquire() catch unreachable);
+// defer held.release();
+//
+// const dir = &watch.os_data.wd_table.get(ev.wd).?.value;
+// if (dir.file_table.get(basename_with_null)) |entry| {
+// break :blk entry.value;
+// } else {
+// break :blk null;
+// }
+// };
+// if (user_value) |v| {
+// await (async channel.put(Event{
+// .id = WatchEventId.CloseWrite,
+// .data = v,
+// }) catch unreachable);
+// }
+// }
+// }
+// },
+// os.linux.EINTR => continue,
+// os.linux.EINVAL => unreachable,
+// os.linux.EFAULT => unreachable,
+// os.linux.EAGAIN => {
+// (await (async loop.linuxWaitFd(
+// inotify_fd,
+// os.linux.EPOLLET | os.linux.EPOLLIN,
+// ) catch unreachable)) catch |err| {
+// const transformed_err = switch (err) {
+// error.FileDescriptorAlreadyPresentInSet => unreachable,
+// error.OperationCausesCircularLoop => unreachable,
+// error.FileDescriptorNotRegistered => unreachable,
+// error.FileDescriptorIncompatibleWithEpoll => unreachable,
+// error.Unexpected => unreachable,
+// else => |e| e,
+// };
+// await (async channel.put(transformed_err) catch unreachable);
+// };
+// },
+// else => unreachable,
+// }
+// }
+// }
+// };
+//}
+
+const test_tmp_dir = "std_event_fs_test";
+
+// TODO this test is disabled until the async function rewrite is finished.
+//test "write a file, watch it, write it again" {
+// return error.SkipZigTest;
+// const allocator = std.heap.direct_allocator;
+//
+// // TODO move this into event loop too
+// try os.makePath(allocator, test_tmp_dir);
+// defer os.deleteTree(allocator, test_tmp_dir) catch {};
+//
+// var loop: Loop = undefined;
+// try loop.initMultiThreaded(allocator);
+// defer loop.deinit();
+//
+// var result: anyerror!void = error.ResultNeverWritten;
+// const handle = try async<allocator> testFsWatchCantFail(&loop, &result);
+// defer cancel handle;
+//
+// loop.run();
+// return result;
+//}
+
+fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void {
+ result.* = testFsWatch(loop);
+}
+
+fn testFsWatch(loop: *Loop) !void {
+ const file_path = try std.fs.path.join(loop.allocator, [][]const u8{ test_tmp_dir, "file.txt" });
+ defer loop.allocator.free(file_path);
+
+ const contents =
+ \\line 1
+ \\line 2
+ ;
+ const line2_offset = 7;
+
+ // first just write then read the file
+ try writeFile(loop, file_path, contents);
+
+ const read_contents = try readFile(loop, file_path, 1024 * 1024);
+ testing.expectEqualSlices(u8, contents, read_contents);
+
+ // now watch the file
+ var watch = try Watch(void).create(loop, 0);
+ defer watch.destroy();
+
+ testing.expect((try watch.addFile(file_path, {})) == null);
+
+ const ev = async watch.channel.get();
+ var ev_consumed = false;
+ defer if (!ev_consumed) await ev;
+
+ // overwrite line 2
+ const fd = try await openReadWrite(loop, file_path, File.default_mode);
+ {
+ defer os.close(fd);
+
+ try pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset);
+ }
+
+ ev_consumed = true;
+ switch ((try await ev).id) {
+ WatchEventId.CloseWrite => {},
+ WatchEventId.Delete => @panic("wrong event"),
+ }
+ const contents_updated = try readFile(loop, file_path, 1024 * 1024);
+ testing.expectEqualSlices(u8,
+ \\line 1
+ \\lorem ipsum
+ , contents_updated);
+
+ // TODO test deleting the file and then re-adding it. we should get events for both
+}
+
+pub const OutStream = struct {
+ fd: fd_t,
+ stream: Stream,
+ loop: *Loop,
+ offset: usize,
+
+ pub const Error = File.WriteError;
+ pub const Stream = event.io.OutStream(Error);
+
+ pub fn init(loop: *Loop, fd: fd_t, offset: usize) OutStream {
+ return OutStream{
+ .fd = fd,
+ .loop = loop,
+ .offset = offset,
+ .stream = Stream{ .writeFn = writeFn },
+ };
+ }
+
+ fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void {
+ const self = @fieldParentPtr(OutStream, "stream", out_stream);
+ const offset = self.offset;
+ self.offset += bytes.len;
+ return pwritev(self.loop, self.fd, [][]const u8{bytes}, offset);
+ }
+};
+
+pub const InStream = struct {
+ fd: fd_t,
+ stream: Stream,
+ loop: *Loop,
+ offset: usize,
+
+ pub const Error = PReadVError; // TODO make this not have OutOfMemory
+ pub const Stream = event.io.InStream(Error);
+
+ pub fn init(loop: *Loop, fd: fd_t, offset: usize) InStream {
+ return InStream{
+ .fd = fd,
+ .loop = loop,
+ .offset = offset,
+ .stream = Stream{ .readFn = readFn },
+ };
+ }
+
+ fn readFn(in_stream: *Stream, bytes: []u8) Error!usize {
+ const self = @fieldParentPtr(InStream, "stream", in_stream);
+ const amt = try preadv(self.loop, self.fd, [][]u8{bytes}, self.offset);
+ self.offset += amt;
+ return amt;
+ }
+};
diff --git a/lib/std/event/future.zig b/lib/std/event/future.zig
new file mode 100644
index 0000000000..1e3508de41
--- /dev/null
+++ b/lib/std/event/future.zig
@@ -0,0 +1,121 @@
+const std = @import("../std.zig");
+const assert = std.debug.assert;
+const testing = std.testing;
+const builtin = @import("builtin");
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+
+/// This is a value that starts out unavailable, until resolve() is called
+/// While it is unavailable, functions suspend when they try to get() it,
+/// and then are resumed when resolve() is called.
+/// At this point the value remains forever available, and another resolve() is not allowed.
+pub fn Future(comptime T: type) type {
+ return struct {
+ lock: Lock,
+ data: T,
+
+ /// TODO make this an enum
+ /// 0 - not started
+ /// 1 - started
+ /// 2 - finished
+ available: u8,
+
+ const Self = @This();
+ const Queue = std.atomic.Queue(anyframe);
+
+ pub fn init(loop: *Loop) Self {
+ return Self{
+ .lock = Lock.initLocked(loop),
+ .available = 0,
+ .data = undefined,
+ };
+ }
+
+ /// Obtain the value. If it's not available, wait until it becomes
+ /// available.
+ /// Thread-safe.
+ pub async fn get(self: *Self) *T {
+ if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
+ return &self.data;
+ }
+ const held = self.lock.acquire();
+ held.release();
+
+ return &self.data;
+ }
+
+ /// Gets the data without waiting for it. If it's available, a pointer is
+ /// returned. Otherwise, null is returned.
+ pub fn getOrNull(self: *Self) ?*T {
+ if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
+ return &self.data;
+ } else {
+ return null;
+ }
+ }
+
+ /// If someone else has started working on the data, wait for them to complete
+ /// and return a pointer to the data. Otherwise, return null, and the caller
+ /// should start working on the data.
+ /// It's not required to call start() before resolve() but it can be useful since
+ /// this method is thread-safe.
+ pub async fn start(self: *Self) ?*T {
+ const state = @cmpxchgStrong(u8, &self.available, 0, 1, .SeqCst, .SeqCst) orelse return null;
+ switch (state) {
+ 1 => {
+ const held = self.lock.acquire();
+ held.release();
+ return &self.data;
+ },
+ 2 => return &self.data,
+ else => unreachable,
+ }
+ }
+
+ /// Make the data become available. May be called only once.
+ /// Before calling this, modify the `data` property.
+ pub fn resolve(self: *Self) void {
+ const prev = @atomicRmw(u8, &self.available, .Xchg, 2, .SeqCst);
+ assert(prev == 0 or prev == 1); // resolve() called twice
+ Lock.Held.release(Lock.Held{ .lock = &self.lock });
+ }
+ };
+}
+
+test "std.event.Future" {
+ // https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+ // https://github.com/ziglang/zig/issues/3251
+ if (std.os.freebsd.is_the_target) return error.SkipZigTest;
+
+ const allocator = std.heap.direct_allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ const handle = async testFuture(&loop);
+
+ loop.run();
+}
+
+fn testFuture(loop: *Loop) void {
+ var future = Future(i32).init(loop);
+
+ var a = async waitOnFuture(&future);
+ var b = async waitOnFuture(&future);
+ resolveFuture(&future);
+
+ const result = (await a) + (await b);
+
+ testing.expect(result == 12);
+}
+
+fn waitOnFuture(future: *Future(i32)) i32 {
+ return future.get().*;
+}
+
+fn resolveFuture(future: *Future(i32)) void {
+ future.data = 6;
+ future.resolve();
+}
diff --git a/lib/std/event/group.zig b/lib/std/event/group.zig
new file mode 100644
index 0000000000..f96b938f80
--- /dev/null
+++ b/lib/std/event/group.zig
@@ -0,0 +1,131 @@
+const std = @import("../std.zig");
+const builtin = @import("builtin");
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+const testing = std.testing;
+
+/// ReturnType must be `void` or `E!void`
+pub fn Group(comptime ReturnType: type) type {
+ return struct {
+ frame_stack: Stack,
+ alloc_stack: Stack,
+ lock: Lock,
+
+ const Self = @This();
+
+ const Error = switch (@typeInfo(ReturnType)) {
+ .ErrorUnion => |payload| payload.error_set,
+ else => void,
+ };
+ const Stack = std.atomic.Stack(anyframe->ReturnType);
+
+ pub fn init(loop: *Loop) Self {
+ return Self{
+ .frame_stack = Stack.init(),
+ .alloc_stack = Stack.init(),
+ .lock = Lock.init(loop),
+ };
+ }
+
+ /// Add a frame to the group. Thread-safe.
+ pub fn add(self: *Self, handle: anyframe->ReturnType) (error{OutOfMemory}!void) {
+ const node = try self.lock.loop.allocator.create(Stack.Node);
+ node.* = Stack.Node{
+ .next = undefined,
+ .data = handle,
+ };
+ self.alloc_stack.push(node);
+ }
+
+ /// Add a node to the group. Thread-safe. Cannot fail.
+ /// `node.data` should be the frame handle to add to the group.
+ /// The node's memory should be in the function frame of
+ /// the handle that is in the node, or somewhere guaranteed to live
+ /// at least as long.
+ pub fn addNode(self: *Self, node: *Stack.Node) void {
+ self.frame_stack.push(node);
+ }
+
+ /// Wait for all the calls and promises of the group to complete.
+ /// Thread-safe.
+ /// Safe to call any number of times.
+ pub async fn wait(self: *Self) ReturnType {
+ const held = self.lock.acquire();
+ defer held.release();
+
+ var result: ReturnType = {};
+
+ while (self.frame_stack.pop()) |node| {
+ if (Error == void) {
+ await node.data;
+ } else {
+ (await node.data) catch |err| {
+ result = err;
+ };
+ }
+ }
+ while (self.alloc_stack.pop()) |node| {
+ const handle = node.data;
+ self.lock.loop.allocator.destroy(node);
+ if (Error == void) {
+ await handle;
+ } else {
+ (await handle) catch |err| {
+ result = err;
+ };
+ }
+ }
+ return result;
+ }
+ };
+}
+
+test "std.event.Group" {
+ // https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+
+ const allocator = std.heap.direct_allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ const handle = async testGroup(&loop);
+
+ loop.run();
+}
+
+async fn testGroup(loop: *Loop) void {
+ var count: usize = 0;
+ var group = Group(void).init(loop);
+ var sleep_a_little_frame = async sleepALittle(&count);
+ group.add(&sleep_a_little_frame) catch @panic("memory");
+ var increase_by_ten_frame = async increaseByTen(&count);
+ group.add(&increase_by_ten_frame) catch @panic("memory");
+ group.wait();
+ testing.expect(count == 11);
+
+ var another = Group(anyerror!void).init(loop);
+ var something_else_frame = async somethingElse();
+ another.add(&something_else_frame) catch @panic("memory");
+ var something_that_fails_frame = async doSomethingThatFails();
+ another.add(&something_that_fails_frame) catch @panic("memory");
+ testing.expectError(error.ItBroke, another.wait());
+}
+
+async fn sleepALittle(count: *usize) void {
+ std.time.sleep(1 * std.time.millisecond);
+ _ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
+}
+
+async fn increaseByTen(count: *usize) void {
+ var i: usize = 0;
+ while (i < 10) : (i += 1) {
+ _ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
+ }
+}
+
+async fn doSomethingThatFails() anyerror!void {}
+async fn somethingElse() anyerror!void {
+ return error.ItBroke;
+}
diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig
new file mode 100644
index 0000000000..a0b1fd3e50
--- /dev/null
+++ b/lib/std/event/lock.zig
@@ -0,0 +1,186 @@
+const std = @import("../std.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const testing = std.testing;
+const mem = std.mem;
+const Loop = std.event.Loop;
+
+/// Thread-safe async/await lock.
+/// Functions which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+/// Allows only one actor to hold the lock.
+pub const Lock = struct {
+ loop: *Loop,
+ shared_bit: u8, // TODO make this a bool
+ queue: Queue,
+ queue_empty_bit: u8, // TODO make this a bool
+
+ const Queue = std.atomic.Queue(anyframe);
+
+ pub const Held = struct {
+ lock: *Lock,
+
+ pub fn release(self: Held) void {
+ // Resume the next item from the queue.
+ if (self.lock.queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // We need to release the lock.
+ _ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst);
+
+ // There might be a queue item. If we know the queue is empty, we can be done,
+ // because the other actor will try to obtain the lock.
+ // But if there's a queue item, we are the actor which must loop and attempt
+ // to grab the lock again.
+ if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) {
+ return;
+ }
+
+ while (true) {
+ const old_bit = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 1, .SeqCst);
+ if (old_bit != 0) {
+ // We did not obtain the lock. Great, the queue is someone else's problem.
+ return;
+ }
+
+ // Resume the next item from the queue.
+ if (self.lock.queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // Release the lock again.
+ _ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst);
+
+ // Find out if we can be done.
+ if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) {
+ return;
+ }
+ }
+ }
+ };
+
+ pub fn init(loop: *Loop) Lock {
+ return Lock{
+ .loop = loop,
+ .shared_bit = 0,
+ .queue = Queue.init(),
+ .queue_empty_bit = 1,
+ };
+ }
+
+ pub fn initLocked(loop: *Loop) Lock {
+ return Lock{
+ .loop = loop,
+ .shared_bit = 1,
+ .queue = Queue.init(),
+ .queue_empty_bit = 1,
+ };
+ }
+
+ /// Must be called when not locked. Not thread safe.
+ /// All calls to acquire() and release() must complete before calling deinit().
+ pub fn deinit(self: *Lock) void {
+ assert(self.shared_bit == 0);
+ while (self.queue.get()) |node| resume node.data;
+ }
+
+ pub async fn acquire(self: *Lock) Held {
+ var my_tick_node = Loop.NextTickNode.init(@frame());
+
+ errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire
+ suspend {
+ self.queue.put(&my_tick_node);
+
+ // At this point, we are in the queue, so we might have already been resumed.
+
+ // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
+ // will attempt to grab the lock.
+ _ = @atomicRmw(u8, &self.queue_empty_bit, .Xchg, 0, .SeqCst);
+
+ const old_bit = @atomicRmw(u8, &self.shared_bit, .Xchg, 1, .SeqCst);
+ if (old_bit == 0) {
+ if (self.queue.get()) |node| {
+ // Whether this node is us or someone else, we tail resume it.
+ resume node.data;
+ }
+ }
+ }
+
+ return Held{ .lock = self };
+ }
+};
+
+test "std.event.Lock" {
+ // TODO https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+ // TODO https://github.com/ziglang/zig/issues/3251
+ if (std.os.freebsd.is_the_target) return error.SkipZigTest;
+
+ const allocator = std.heap.direct_allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var lock = Lock.init(&loop);
+ defer lock.deinit();
+
+ _ = async testLock(&loop, &lock);
+ loop.run();
+
+ testing.expectEqualSlices(i32, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len, shared_test_data);
+}
+
+async fn testLock(loop: *Loop, lock: *Lock) void {
+ var handle1 = async lockRunner(lock);
+ var tick_node1 = Loop.NextTickNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = &handle1,
+ };
+ loop.onNextTick(&tick_node1);
+
+ var handle2 = async lockRunner(lock);
+ var tick_node2 = Loop.NextTickNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = &handle2,
+ };
+ loop.onNextTick(&tick_node2);
+
+ var handle3 = async lockRunner(lock);
+ var tick_node3 = Loop.NextTickNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = &handle3,
+ };
+ loop.onNextTick(&tick_node3);
+
+ await handle1;
+ await handle2;
+ await handle3;
+}
+
+var shared_test_data = [1]i32{0} ** 10;
+var shared_test_index: usize = 0;
+
+async fn lockRunner(lock: *Lock) void {
+ suspend; // resumed by onNextTick
+
+ var i: usize = 0;
+ while (i < shared_test_data.len) : (i += 1) {
+ var lock_frame = async lock.acquire();
+ const handle = await lock_frame;
+ defer handle.release();
+
+ shared_test_index = 0;
+ while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
+ shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
+ }
+ }
+}
diff --git a/lib/std/event/locked.zig b/lib/std/event/locked.zig
new file mode 100644
index 0000000000..aeedf3558a
--- /dev/null
+++ b/lib/std/event/locked.zig
@@ -0,0 +1,43 @@
+const std = @import("../std.zig");
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+
+/// Thread-safe async/await lock that protects one piece of data.
+/// Functions which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+pub fn Locked(comptime T: type) type {
+ return struct {
+ lock: Lock,
+ private_data: T,
+
+ const Self = @This();
+
+ pub const HeldLock = struct {
+ value: *T,
+ held: Lock.Held,
+
+ pub fn release(self: HeldLock) void {
+ self.held.release();
+ }
+ };
+
+ pub fn init(loop: *Loop, data: T) Self {
+ return Self{
+ .lock = Lock.init(loop),
+ .private_data = data,
+ };
+ }
+
+ pub fn deinit(self: *Self) void {
+ self.lock.deinit();
+ }
+
+ pub async fn acquire(self: *Self) HeldLock {
+ return HeldLock{
+ // TODO guaranteed allocation elision
+ .held = await (async self.lock.acquire() catch unreachable),
+ .value = &self.private_data,
+ };
+ }
+ };
+}
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
new file mode 100644
index 0000000000..d0d36abc0c
--- /dev/null
+++ b/lib/std/event/loop.zig
@@ -0,0 +1,923 @@
+const std = @import("../std.zig");
+const builtin = @import("builtin");
+const root = @import("root");
+const assert = std.debug.assert;
+const testing = std.testing;
+const mem = std.mem;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const fs = std.event.fs;
+const os = std.os;
+const windows = os.windows;
+const maxInt = std.math.maxInt;
+const Thread = std.Thread;
+
+pub const Loop = struct {
+ allocator: *mem.Allocator,
+ next_tick_queue: std.atomic.Queue(anyframe),
+ os_data: OsData,
+ final_resume_node: ResumeNode,
+ pending_event_count: usize,
+ extra_threads: []*Thread,
+
+ // pre-allocated eventfds. all permanently active.
+ // this is how we send promises to be resumed on other threads.
+ available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
+ eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
+
+ pub const NextTickNode = std.atomic.Queue(anyframe).Node;
+
+ pub const ResumeNode = struct {
+ id: Id,
+ handle: anyframe,
+ overlapped: Overlapped,
+
+ pub const overlapped_init = switch (builtin.os) {
+ .windows => windows.OVERLAPPED{
+ .Internal = 0,
+ .InternalHigh = 0,
+ .Offset = 0,
+ .OffsetHigh = 0,
+ .hEvent = null,
+ },
+ else => {},
+ };
+ pub const Overlapped = @typeOf(overlapped_init);
+
+ pub const Id = enum {
+ Basic,
+ Stop,
+ EventFd,
+ };
+
+ pub const EventFd = switch (builtin.os) {
+ .macosx, .freebsd, .netbsd => KEventFd,
+ .linux => struct {
+ base: ResumeNode,
+ epoll_op: u32,
+ eventfd: i32,
+ },
+ .windows => struct {
+ base: ResumeNode,
+ completion_key: usize,
+ },
+ else => @compileError("unsupported OS"),
+ };
+
+ const KEventFd = struct {
+ base: ResumeNode,
+ kevent: os.Kevent,
+ };
+
+ pub const Basic = switch (builtin.os) {
+ .macosx, .freebsd, .netbsd => KEventBasic,
+ .linux => struct {
+ base: ResumeNode,
+ },
+ .windows => struct {
+ base: ResumeNode,
+ },
+ else => @compileError("unsupported OS"),
+ };
+
+ const KEventBasic = struct {
+ base: ResumeNode,
+ kev: os.Kevent,
+ };
+ };
+
+ var global_instance_state: Loop = undefined;
+ const default_instance: ?*Loop = switch (std.io.mode) {
+ .blocking => null,
+ .evented => &global_instance_state,
+ };
+ pub const instance: ?*Loop = if (@hasDecl(root, "event_loop")) root.event_loop else default_instance;
+
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
+ pub fn init(self: *Loop, allocator: *mem.Allocator) !void {
+ if (builtin.single_threaded) {
+ return self.initSingleThreaded(allocator);
+ } else {
+ return self.initMultiThreaded(allocator);
+ }
+ }
+
+ /// After initialization, call run().
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
+ pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
+ return self.initInternal(allocator, 1);
+ }
+
+ /// The allocator must be thread-safe because we use it for multiplexing
+ /// async functions onto kernel threads.
+ /// After initialization, call run().
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
+ pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
+ if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode");
+ const core_count = try Thread.cpuCount();
+ return self.initInternal(allocator, core_count);
+ }
+
+ /// Thread count is the total thread count. The thread pool size will be
+ /// max(thread_count - 1, 0)
+ fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
+ self.* = Loop{
+ .pending_event_count = 1,
+ .allocator = allocator,
+ .os_data = undefined,
+ .next_tick_queue = std.atomic.Queue(anyframe).init(),
+ .extra_threads = undefined,
+ .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
+ .eventfd_resume_nodes = undefined,
+ .final_resume_node = ResumeNode{
+ .id = ResumeNode.Id.Stop,
+ .handle = undefined,
+ .overlapped = ResumeNode.overlapped_init,
+ },
+ };
+ // We need at least one of these in case the fs thread wants to use onNextTick
+ const extra_thread_count = thread_count - 1;
+ const resume_node_count = std.math.max(extra_thread_count, 1);
+ self.eventfd_resume_nodes = try self.allocator.alloc(
+ std.atomic.Stack(ResumeNode.EventFd).Node,
+ resume_node_count,
+ );
+ errdefer self.allocator.free(self.eventfd_resume_nodes);
+
+ self.extra_threads = try self.allocator.alloc(*Thread, extra_thread_count);
+ errdefer self.allocator.free(self.extra_threads);
+
+ try self.initOsData(extra_thread_count);
+ errdefer self.deinitOsData();
+ }
+
+ pub fn deinit(self: *Loop) void {
+ self.deinitOsData();
+ self.allocator.free(self.extra_threads);
+ }
+
+ const InitOsDataError = os.EpollCreateError || mem.Allocator.Error || os.EventFdError ||
+ Thread.SpawnError || os.EpollCtlError || os.KEventError ||
+ windows.CreateIoCompletionPortError;
+
+ const wakeup_bytes = [_]u8{0x1} ** 8;
+
+ fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
+ switch (builtin.os) {
+ .linux => {
+ self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
+ self.os_data.fs_queue_item = 0;
+ // we need another thread for the file system because Linux does not have an async
+ // file system I/O API.
+ self.os_data.fs_end_request = fs.RequestNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = fs.Request{
+ .msg = fs.Request.Msg.End,
+ .finish = fs.Request.Finish.NoAction,
+ },
+ };
+
+ errdefer {
+ while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
+ }
+ for (self.eventfd_resume_nodes) |*eventfd_node| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = .EventFd,
+ .handle = undefined,
+ .overlapped = ResumeNode.overlapped_init,
+ },
+ .eventfd = try os.eventfd(1, os.EFD_CLOEXEC | os.EFD_NONBLOCK),
+ .epoll_op = os.EPOLL_CTL_ADD,
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ }
+
+ self.os_data.epollfd = try os.epoll_create1(os.EPOLL_CLOEXEC);
+ errdefer os.close(self.os_data.epollfd);
+
+ self.os_data.final_eventfd = try os.eventfd(0, os.EFD_CLOEXEC | os.EFD_NONBLOCK);
+ errdefer os.close(self.os_data.final_eventfd);
+
+ self.os_data.final_eventfd_event = os.epoll_event{
+ .events = os.EPOLLIN,
+ .data = os.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
+ };
+ try os.epoll_ctl(
+ self.os_data.epollfd,
+ os.EPOLL_CTL_ADD,
+ self.os_data.final_eventfd,
+ &self.os_data.final_eventfd_event,
+ );
+
+ self.os_data.fs_thread = try Thread.spawn(self, posixFsRun);
+ errdefer {
+ self.posixFsRequest(&self.os_data.fs_end_request);
+ self.os_data.fs_thread.wait();
+ }
+
+ if (builtin.single_threaded) {
+ assert(extra_thread_count == 0);
+ return;
+ }
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ // writing 8 bytes to an eventfd cannot fail
+ os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun);
+ }
+ },
+ .macosx, .freebsd, .netbsd => {
+ self.os_data.kqfd = try os.kqueue();
+ errdefer os.close(self.os_data.kqfd);
+
+ self.os_data.fs_kqfd = try os.kqueue();
+ errdefer os.close(self.os_data.fs_kqfd);
+
+ self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
+ // we need another thread for the file system because Darwin does not have an async
+ // file system I/O API.
+ self.os_data.fs_end_request = fs.RequestNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = fs.Request{
+ .msg = fs.Request.Msg.End,
+ .finish = fs.Request.Finish.NoAction,
+ },
+ };
+
+ const empty_kevs = ([*]os.Kevent)(undefined)[0..0];
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ .overlapped = ResumeNode.overlapped_init,
+ },
+ // this one is for sending events
+ .kevent = os.Kevent{
+ .ident = i,
+ .filter = os.EVFILT_USER,
+ .flags = os.EV_CLEAR | os.EV_ADD | os.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&eventfd_node.data.base),
+ },
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ const kevent_array = (*const [1]os.Kevent)(&eventfd_node.data.kevent);
+ _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
+ eventfd_node.data.kevent.flags = os.EV_CLEAR | os.EV_ENABLE;
+ eventfd_node.data.kevent.fflags = os.NOTE_TRIGGER;
+ }
+
+ // Pre-add so that we cannot get error.SystemResources
+ // later when we try to activate it.
+ self.os_data.final_kevent = os.Kevent{
+ .ident = extra_thread_count,
+ .filter = os.EVFILT_USER,
+ .flags = os.EV_ADD | os.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&self.final_resume_node),
+ };
+ const final_kev_arr = (*const [1]os.Kevent)(&self.os_data.final_kevent);
+ _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
+ self.os_data.final_kevent.flags = os.EV_ENABLE;
+ self.os_data.final_kevent.fflags = os.NOTE_TRIGGER;
+
+ self.os_data.fs_kevent_wake = os.Kevent{
+ .ident = 0,
+ .filter = os.EVFILT_USER,
+ .flags = os.EV_ADD | os.EV_ENABLE,
+ .fflags = os.NOTE_TRIGGER,
+ .data = 0,
+ .udata = undefined,
+ };
+
+ self.os_data.fs_kevent_wait = os.Kevent{
+ .ident = 0,
+ .filter = os.EVFILT_USER,
+ .flags = os.EV_ADD | os.EV_CLEAR,
+ .fflags = 0,
+ .data = 0,
+ .udata = undefined,
+ };
+
+ self.os_data.fs_thread = try Thread.spawn(self, posixFsRun);
+ errdefer {
+ self.posixFsRequest(&self.os_data.fs_end_request);
+ self.os_data.fs_thread.wait();
+ }
+
+ if (builtin.single_threaded) {
+ assert(extra_thread_count == 0);
+ return;
+ }
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun);
+ }
+ },
+ .windows => {
+ self.os_data.io_port = try windows.CreateIoCompletionPort(
+ windows.INVALID_HANDLE_VALUE,
+ null,
+ undefined,
+ maxInt(windows.DWORD),
+ );
+ errdefer windows.CloseHandle(self.os_data.io_port);
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ .overlapped = ResumeNode.overlapped_init,
+ },
+ // this one is for sending events
+ .completion_key = @ptrToInt(&eventfd_node.data.base),
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ }
+
+ if (builtin.single_threaded) {
+ assert(extra_thread_count == 0);
+ return;
+ }
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ var i: usize = 0;
+ while (i < extra_thread_index) : (i += 1) {
+ while (true) {
+ const overlapped = &self.final_resume_node.overlapped;
+ windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
+ break;
+ }
+ }
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try Thread.spawn(self, workerRun);
+ }
+ },
+ else => {},
+ }
+ }
+
+ fn deinitOsData(self: *Loop) void {
+ switch (builtin.os) {
+ .linux => {
+ os.close(self.os_data.final_eventfd);
+ while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
+ os.close(self.os_data.epollfd);
+ self.allocator.free(self.eventfd_resume_nodes);
+ },
+ .macosx, .freebsd, .netbsd => {
+ os.close(self.os_data.kqfd);
+ os.close(self.os_data.fs_kqfd);
+ },
+ .windows => {
+ windows.CloseHandle(self.os_data.io_port);
+ },
+ else => {},
+ }
+ }
+
+ /// resume_node must live longer than the anyframe that it holds a reference to.
+ /// flags must contain EPOLLET
+ pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void {
+ assert(flags & os.EPOLLET == os.EPOLLET);
+ self.beginOneEvent();
+ errdefer self.finishOneEvent();
+ try self.linuxModFd(
+ fd,
+ os.EPOLL_CTL_ADD,
+ flags,
+ resume_node,
+ );
+ }
+
+ pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void {
+ assert(flags & os.EPOLLET == os.EPOLLET);
+ var ev = os.linux.epoll_event{
+ .events = flags,
+ .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
+ };
+ try os.epoll_ctl(self.os_data.epollfd, op, fd, &ev);
+ }
+
+ pub fn linuxRemoveFd(self: *Loop, fd: i32) void {
+ os.epoll_ctl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, null) catch {};
+ self.finishOneEvent();
+ }
+
+ pub fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void {
+ defer self.linuxRemoveFd(fd);
+ suspend {
+ var resume_node = ResumeNode.Basic{
+ .base = ResumeNode{
+ .id = .Basic,
+ .handle = @frame(),
+ .overlapped = ResumeNode.overlapped_init,
+ },
+ };
+ try self.linuxAddFd(fd, &resume_node.base, flags);
+ }
+ }
+
+ pub fn waitUntilFdReadable(self: *Loop, fd: os.fd_t) !void {
+ return self.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN);
+ }
+
+ pub async fn bsdWaitKev(self: *Loop, ident: usize, filter: i16, fflags: u32) !os.Kevent {
+ var resume_node = ResumeNode.Basic{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.Basic,
+ .handle = @frame(),
+ .overlapped = ResumeNode.overlapped_init,
+ },
+ .kev = undefined,
+ };
+ defer self.bsdRemoveKev(ident, filter);
+ suspend {
+ try self.bsdAddKev(&resume_node, ident, filter, fflags);
+ }
+ return resume_node.kev;
+ }
+
+ /// resume_node must live longer than the anyframe that it holds a reference to.
+ pub fn bsdAddKev(self: *Loop, resume_node: *ResumeNode.Basic, ident: usize, filter: i16, fflags: u32) !void {
+ self.beginOneEvent();
+ errdefer self.finishOneEvent();
+ var kev = os.Kevent{
+ .ident = ident,
+ .filter = filter,
+ .flags = os.EV_ADD | os.EV_ENABLE | os.EV_CLEAR,
+ .fflags = fflags,
+ .data = 0,
+ .udata = @ptrToInt(&resume_node.base),
+ };
+ const kevent_array = (*const [1]os.Kevent)(&kev);
+ const empty_kevs = ([*]os.Kevent)(undefined)[0..0];
+ _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
+ }
+
+ pub fn bsdRemoveKev(self: *Loop, ident: usize, filter: i16) void {
+ var kev = os.Kevent{
+ .ident = ident,
+ .filter = filter,
+ .flags = os.EV_DELETE,
+ .fflags = 0,
+ .data = 0,
+ .udata = 0,
+ };
+ const kevent_array = (*const [1]os.Kevent)(&kev);
+ const empty_kevs = ([*]os.Kevent)(undefined)[0..0];
+ _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch undefined;
+ self.finishOneEvent();
+ }
+
+ fn dispatch(self: *Loop) void {
+ while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
+ const next_tick_node = self.next_tick_queue.get() orelse {
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ return;
+ };
+ const eventfd_node = &resume_stack_node.data;
+ eventfd_node.base.handle = next_tick_node.data;
+ switch (builtin.os) {
+ .macosx, .freebsd, .netbsd => {
+ const kevent_array = (*const [1]os.Kevent)(&eventfd_node.kevent);
+ const empty_kevs = ([*]os.Kevent)(undefined)[0..0];
+ _ = os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch {
+ self.next_tick_queue.unget(next_tick_node);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ return;
+ };
+ },
+ .linux => {
+ // the pending count is already accounted for
+ const epoll_events = os.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT |
+ os.linux.EPOLLET;
+ self.linuxModFd(
+ eventfd_node.eventfd,
+ eventfd_node.epoll_op,
+ epoll_events,
+ &eventfd_node.base,
+ ) catch {
+ self.next_tick_queue.unget(next_tick_node);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ return;
+ };
+ },
+ .windows => {
+ windows.PostQueuedCompletionStatus(
+ self.os_data.io_port,
+ undefined,
+ undefined,
+ &eventfd_node.base.overlapped,
+ ) catch {
+ self.next_tick_queue.unget(next_tick_node);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ return;
+ };
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+ }
+
+ /// Bring your own linked list node. This means it can't fail.
+ pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
+ self.beginOneEvent(); // finished in dispatch()
+ self.next_tick_queue.put(node);
+ self.dispatch();
+ }
+
+ pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void {
+ if (self.next_tick_queue.remove(node)) {
+ self.finishOneEvent();
+ }
+ }
+
+ pub fn run(self: *Loop) void {
+ self.finishOneEvent(); // the reference we start with
+
+ self.workerRun();
+
+ switch (builtin.os) {
+ .linux,
+ .macosx,
+ .freebsd,
+ .netbsd,
+ => self.os_data.fs_thread.wait(),
+ else => {},
+ }
+
+ for (self.extra_threads) |extra_thread| {
+ extra_thread.wait();
+ }
+ }
+
+ /// This is equivalent to function call, except it calls `startCpuBoundOperation` first.
+ pub fn call(comptime func: var, args: ...) @typeOf(func).ReturnType {
+ startCpuBoundOperation();
+ return func(args);
+ }
+
+ /// Yielding lets the event loop run, starting any unstarted async operations.
+ /// Note that async operations automatically start when a function yields for any other reason,
+ /// for example, when async I/O is performed. This function is intended to be used only when
+ /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O
+ /// is performed.
+ pub fn yield(self: *Loop) void {
+ suspend {
+ var my_tick_node = NextTickNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = @frame(),
+ };
+ self.onNextTick(&my_tick_node);
+ }
+ }
+
+ /// If the build is multi-threaded and there is an event loop, then it calls `yield`. Otherwise,
+ /// does nothing.
+ pub fn startCpuBoundOperation() void {
+ if (builtin.single_threaded) {
+ return;
+ } else if (instance) |event_loop| {
+ event_loop.yield();
+ }
+ }
+
+ /// call finishOneEvent when done
+ pub fn beginOneEvent(self: *Loop) void {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ }
+
+ pub fn finishOneEvent(self: *Loop) void {
+ const prev = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ if (prev == 1) {
+ // cause all the threads to stop
+ switch (builtin.os) {
+ .linux => {
+ self.posixFsRequest(&self.os_data.fs_end_request);
+ // writing 8 bytes to an eventfd cannot fail
+ os.write(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ return;
+ },
+ .macosx, .freebsd, .netbsd => {
+ self.posixFsRequest(&self.os_data.fs_end_request);
+ const final_kevent = (*const [1]os.Kevent)(&self.os_data.final_kevent);
+ const empty_kevs = ([*]os.Kevent)(undefined)[0..0];
+ // cannot fail because we already added it and this just enables it
+ _ = os.kevent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable;
+ return;
+ },
+ .windows => {
+ var i: usize = 0;
+ while (i < self.extra_threads.len + 1) : (i += 1) {
+ while (true) {
+ const overlapped = &self.final_resume_node.overlapped;
+ windows.PostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
+ break;
+ }
+ }
+ return;
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+ }
+
+ fn workerRun(self: *Loop) void {
+ while (true) {
+ while (true) {
+ const next_tick_node = self.next_tick_queue.get() orelse break;
+ self.dispatch();
+ resume next_tick_node.data;
+ self.finishOneEvent();
+ }
+
+ switch (builtin.os) {
+ .linux => {
+ // only process 1 event so we don't steal from other threads
+ var events: [1]os.linux.epoll_event = undefined;
+ const count = os.epoll_wait(self.os_data.epollfd, events[0..], -1);
+ for (events[0..count]) |ev| {
+ const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ .Basic => {},
+ .Stop => return,
+ .EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ event_fd_node.epoll_op = os.EPOLL_CTL_MOD;
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ self.finishOneEvent();
+ }
+ }
+ },
+ .macosx, .freebsd, .netbsd => {
+ var eventlist: [1]os.Kevent = undefined;
+ const empty_kevs = ([*]os.Kevent)(undefined)[0..0];
+ const count = os.kevent(self.os_data.kqfd, empty_kevs, eventlist[0..], null) catch unreachable;
+ for (eventlist[0..count]) |ev| {
+ const resume_node = @intToPtr(*ResumeNode, ev.udata);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ .Basic => {
+ const basic_node = @fieldParentPtr(ResumeNode.Basic, "base", resume_node);
+ basic_node.kev = ev;
+ },
+ .Stop => return,
+ .EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ self.finishOneEvent();
+ }
+ }
+ },
+ .windows => {
+ var completion_key: usize = undefined;
+ const overlapped = while (true) {
+ var nbytes: windows.DWORD = undefined;
+ var overlapped: ?*windows.OVERLAPPED = undefined;
+ switch (windows.GetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
+ .Aborted => return,
+ .Normal => {},
+ .EOF => {},
+ .Cancelled => continue,
+ }
+ if (overlapped) |o| break o;
+ } else unreachable; // TODO else unreachable should not be necessary
+ const resume_node = @fieldParentPtr(ResumeNode, "overlapped", overlapped);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ .Basic => {},
+ .Stop => return,
+ .EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ self.finishOneEvent();
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+ }
+
+ fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void {
+ self.beginOneEvent(); // finished in posixFsRun after processing the msg
+ self.os_data.fs_queue.put(request_node);
+ switch (builtin.os) {
+ .macosx, .freebsd, .netbsd => {
+ const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wake);
+ const empty_kevs = ([*]os.Kevent)(undefined)[0..0];
+ _ = os.kevent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable;
+ },
+ .linux => {
+ _ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ const rc = os.linux.futex_wake(&self.os_data.fs_queue_item, os.linux.FUTEX_WAKE, 1);
+ switch (os.linux.getErrno(rc)) {
+ 0 => {},
+ os.EINVAL => unreachable,
+ else => unreachable,
+ }
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ fn posixFsCancel(self: *Loop, request_node: *fs.RequestNode) void {
+ if (self.os_data.fs_queue.remove(request_node)) {
+ self.finishOneEvent();
+ }
+ }
+
+ fn posixFsRun(self: *Loop) void {
+ while (true) {
+ if (builtin.os == .linux) {
+ _ = @atomicRmw(i32, &self.os_data.fs_queue_item, .Xchg, 0, .SeqCst);
+ }
+ while (self.os_data.fs_queue.get()) |node| {
+ switch (node.data.msg) {
+ .End => return,
+ .WriteV => |*msg| {
+ msg.result = os.writev(msg.fd, msg.iov);
+ },
+ .PWriteV => |*msg| {
+ msg.result = os.pwritev(msg.fd, msg.iov, msg.offset);
+ },
+ .PReadV => |*msg| {
+ msg.result = os.preadv(msg.fd, msg.iov, msg.offset);
+ },
+ .Open => |*msg| {
+ msg.result = os.openC(msg.path.ptr, msg.flags, msg.mode);
+ },
+ .Close => |*msg| os.close(msg.fd),
+ .WriteFile => |*msg| blk: {
+ const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT |
+ os.O_CLOEXEC | os.O_TRUNC;
+ const fd = os.openC(msg.path.ptr, flags, msg.mode) catch |err| {
+ msg.result = err;
+ break :blk;
+ };
+ defer os.close(fd);
+ msg.result = os.write(fd, msg.contents);
+ },
+ }
+ switch (node.data.finish) {
+ .TickNode => |*tick_node| self.onNextTick(tick_node),
+ .DeallocCloseOperation => |close_op| {
+ self.allocator.destroy(close_op);
+ },
+ .NoAction => {},
+ }
+ self.finishOneEvent();
+ }
+ switch (builtin.os) {
+ .linux => {
+ const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null);
+ switch (os.linux.getErrno(rc)) {
+ 0, os.EINTR, os.EAGAIN => continue,
+ else => unreachable,
+ }
+ },
+ .macosx, .freebsd, .netbsd => {
+ const fs_kevs = (*const [1]os.Kevent)(&self.os_data.fs_kevent_wait);
+ var out_kevs: [1]os.Kevent = undefined;
+ _ = os.kevent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable;
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+ }
+
+ const OsData = switch (builtin.os) {
+ .linux => LinuxOsData,
+ .macosx, .freebsd, .netbsd => KEventData,
+ .windows => struct {
+ io_port: windows.HANDLE,
+ extra_thread_count: usize,
+ },
+ else => struct {},
+ };
+
+ const KEventData = struct {
+ kqfd: i32,
+ final_kevent: os.Kevent,
+ fs_kevent_wake: os.Kevent,
+ fs_kevent_wait: os.Kevent,
+ fs_thread: *Thread,
+ fs_kqfd: i32,
+ fs_queue: std.atomic.Queue(fs.Request),
+ fs_end_request: fs.RequestNode,
+ };
+
+ const LinuxOsData = struct {
+ epollfd: i32,
+ final_eventfd: i32,
+ final_eventfd_event: os.linux.epoll_event,
+ fs_thread: *Thread,
+ fs_queue_item: i32,
+ fs_queue: std.atomic.Queue(fs.Request),
+ fs_end_request: fs.RequestNode,
+ };
+};
+
+test "std.event.Loop - basic" {
+ // https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+
+ const allocator = std.heap.direct_allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ loop.run();
+}
+
+test "std.event.Loop - call" {
+ // https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+
+ const allocator = std.heap.direct_allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var did_it = false;
+ var handle = async Loop.call(testEventLoop);
+ var handle2 = async Loop.call(testEventLoop2, &handle, &did_it);
+
+ loop.run();
+
+ testing.expect(did_it);
+}
+
+async fn testEventLoop() i32 {
+ return 1234;
+}
+
+async fn testEventLoop2(h: anyframe->i32, did_it: *bool) void {
+ const value = await h;
+ testing.expect(value == 1234);
+ did_it.* = true;
+}
diff --git a/lib/std/event/net.zig b/lib/std/event/net.zig
new file mode 100644
index 0000000000..bed665dcdc
--- /dev/null
+++ b/lib/std/event/net.zig
@@ -0,0 +1,358 @@
+const std = @import("../std.zig");
+const builtin = @import("builtin");
+const testing = std.testing;
+const event = std.event;
+const mem = std.mem;
+const os = std.os;
+const Loop = std.event.Loop;
+const File = std.fs.File;
+const fd_t = os.fd_t;
+
+pub const Server = struct {
+ handleRequestFn: async fn (*Server, *const std.net.Address, File) void,
+
+ loop: *Loop,
+ sockfd: ?i32,
+ accept_frame: ?anyframe,
+ listen_address: std.net.Address,
+
+ waiting_for_emfile_node: PromiseNode,
+ listen_resume_node: event.Loop.ResumeNode,
+
+ const PromiseNode = std.TailQueue(anyframe).Node;
+
+ pub fn init(loop: *Loop) Server {
+ // TODO can't initialize handler here because we need well defined copy elision
+ return Server{
+ .loop = loop,
+ .sockfd = null,
+ .accept_frame = null,
+ .handleRequestFn = undefined,
+ .waiting_for_emfile_node = undefined,
+ .listen_address = undefined,
+ .listen_resume_node = event.Loop.ResumeNode{
+ .id = event.Loop.ResumeNode.Id.Basic,
+ .handle = undefined,
+ .overlapped = event.Loop.ResumeNode.overlapped_init,
+ },
+ };
+ }
+
+ pub fn listen(
+ self: *Server,
+ address: *const std.net.Address,
+ handleRequestFn: async fn (*Server, *const std.net.Address, File) void,
+ ) !void {
+ self.handleRequestFn = handleRequestFn;
+
+ const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp);
+ errdefer os.close(sockfd);
+ self.sockfd = sockfd;
+
+ try os.bind(sockfd, &address.os_addr);
+ try os.listen(sockfd, os.SOMAXCONN);
+ self.listen_address = std.net.Address.initPosix(try os.getsockname(sockfd));
+
+ self.accept_frame = async Server.handler(self);
+ errdefer await self.accept_frame.?;
+
+ self.listen_resume_node.handle = self.accept_frame.?;
+ try self.loop.linuxAddFd(sockfd, &self.listen_resume_node, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
+ errdefer self.loop.removeFd(sockfd);
+ }
+
+ /// Stop listening
+ pub fn close(self: *Server) void {
+ self.loop.linuxRemoveFd(self.sockfd.?);
+ if (self.sockfd) |fd| {
+ os.close(fd);
+ self.sockfd = null;
+ }
+ }
+
+ pub fn deinit(self: *Server) void {
+ if (self.accept_frame) |accept_frame| await accept_frame;
+ if (self.sockfd) |sockfd| os.close(sockfd);
+ }
+
+ pub async fn handler(self: *Server) void {
+ while (true) {
+ var accepted_addr: std.net.Address = undefined;
+ // TODO just inline the following function here and don't expose it as posixAsyncAccept
+ if (os.accept4_async(self.sockfd.?, &accepted_addr.os_addr, os.SOCK_NONBLOCK | os.SOCK_CLOEXEC)) |accepted_fd| {
+ if (accepted_fd == -1) {
+ // would block
+ suspend; // we will get resumed by epoll_wait in the event loop
+ continue;
+ }
+ var socket = File.openHandle(accepted_fd);
+ self.handleRequestFn(self, &accepted_addr, socket);
+ } else |err| switch (err) {
+ error.ProcessFdQuotaExceeded => @panic("TODO handle this error"),
+ error.ConnectionAborted => continue,
+
+ error.FileDescriptorNotASocket => unreachable,
+ error.OperationNotSupported => unreachable,
+
+ error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => {
+ @panic("TODO handle this error");
+ },
+ }
+ }
+ }
+};
+
+pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 {
+ const sockfd = try os.socket(
+ os.AF_UNIX,
+ os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK,
+ 0,
+ );
+ errdefer os.close(sockfd);
+
+ var sock_addr = os.sockaddr_un{
+ .family = os.AF_UNIX,
+ .path = undefined,
+ };
+
+ if (path.len > @typeOf(sock_addr.path).len) return error.NameTooLong;
+ mem.copy(u8, sock_addr.path[0..], path);
+ const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len);
+ try os.connect_async(sockfd, &sock_addr, size);
+ try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
+ try os.getsockoptError(sockfd);
+
+ return sockfd;
+}
+
+pub const ReadError = error{
+ SystemResources,
+ Unexpected,
+ UserResourceLimitReached,
+ InputOutput,
+
+ FileDescriptorNotRegistered, // TODO remove this possibility
+ OperationCausesCircularLoop, // TODO remove this possibility
+ FileDescriptorAlreadyPresentInSet, // TODO remove this possibility
+ FileDescriptorIncompatibleWithEpoll, // TODO remove this possibility
+};
+
+/// returns number of bytes read. 0 means EOF.
+pub async fn read(loop: *std.event.Loop, fd: fd_t, buffer: []u8) ReadError!usize {
+ const iov = os.iovec{
+ .iov_base = buffer.ptr,
+ .iov_len = buffer.len,
+ };
+ const iovs: *const [1]os.iovec = &iov;
+ return readvPosix(loop, fd, iovs, 1);
+}
+
+pub const WriteError = error{};
+
+pub async fn write(loop: *std.event.Loop, fd: fd_t, buffer: []const u8) WriteError!void {
+ const iov = os.iovec_const{
+ .iov_base = buffer.ptr,
+ .iov_len = buffer.len,
+ };
+ const iovs: *const [1]os.iovec_const = &iov;
+ return writevPosix(loop, fd, iovs, 1);
+}
+
+pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const os.iovec_const, count: usize) !void {
+ while (true) {
+ switch (builtin.os) {
+ .macosx, .linux => {
+ switch (os.errno(os.system.writev(fd, iov, count))) {
+ 0 => return,
+ os.EINTR => continue,
+ os.ESPIPE => unreachable,
+ os.EINVAL => unreachable,
+ os.EFAULT => unreachable,
+ os.EAGAIN => {
+ try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLOUT);
+ continue;
+ },
+ os.EBADF => unreachable, // always a race condition
+ os.EDESTADDRREQ => unreachable, // connect was never called
+ os.EDQUOT => unreachable,
+ os.EFBIG => unreachable,
+ os.EIO => return error.InputOutput,
+ os.ENOSPC => unreachable,
+ os.EPERM => return error.AccessDenied,
+ os.EPIPE => unreachable,
+ else => |err| return os.unexpectedErrno(err),
+ }
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+}
+
+/// returns number of bytes read. 0 means EOF.
+pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]os.iovec, count: usize) !usize {
+ while (true) {
+ switch (builtin.os) {
+ builtin.Os.linux, builtin.Os.freebsd, builtin.Os.macosx => {
+ const rc = os.system.readv(fd, iov, count);
+ switch (os.errno(rc)) {
+ 0 => return rc,
+ os.EINTR => continue,
+ os.EINVAL => unreachable,
+ os.EFAULT => unreachable,
+ os.EAGAIN => {
+ try loop.linuxWaitFd(fd, os.EPOLLET | os.EPOLLIN);
+ continue;
+ },
+ os.EBADF => unreachable, // always a race condition
+ os.EIO => return error.InputOutput,
+ os.EISDIR => unreachable,
+ os.ENOBUFS => return error.SystemResources,
+ os.ENOMEM => return error.SystemResources,
+ else => |err| return os.unexpectedErrno(err),
+ }
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+}
+
+pub async fn writev(loop: *Loop, fd: fd_t, data: []const []const u8) !void {
+ const iovecs = try loop.allocator.alloc(os.iovec_const, data.len);
+ defer loop.allocator.free(iovecs);
+
+ for (data) |buf, i| {
+ iovecs[i] = os.iovec_const{
+ .iov_base = buf.ptr,
+ .iov_len = buf.len,
+ };
+ }
+
+ return writevPosix(loop, fd, iovecs.ptr, data.len);
+}
+
+pub async fn readv(loop: *Loop, fd: fd_t, data: []const []u8) !usize {
+ const iovecs = try loop.allocator.alloc(os.iovec, data.len);
+ defer loop.allocator.free(iovecs);
+
+ for (data) |buf, i| {
+ iovecs[i] = os.iovec{
+ .iov_base = buf.ptr,
+ .iov_len = buf.len,
+ };
+ }
+
+ return readvPosix(loop, fd, iovecs.ptr, data.len);
+}
+
+pub async fn connect(loop: *Loop, _address: *const std.net.Address) !File {
+ var address = _address.*; // TODO https://github.com/ziglang/zig/issues/1592
+
+ const sockfd = try os.socket(os.AF_INET, os.SOCK_STREAM | os.SOCK_CLOEXEC | os.SOCK_NONBLOCK, os.PROTO_tcp);
+ errdefer os.close(sockfd);
+
+ try os.connect_async(sockfd, &address.os_addr, @sizeOf(os.sockaddr_in));
+ try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
+ try os.getsockoptError(sockfd);
+
+ return File.openHandle(sockfd);
+}
+
+test "listen on a port, send bytes, receive bytes" {
+ // https://github.com/ziglang/zig/issues/2377
+ if (true) return error.SkipZigTest;
+
+ if (builtin.os != builtin.Os.linux) {
+ // TODO build abstractions for other operating systems
+ return error.SkipZigTest;
+ }
+
+ const MyServer = struct {
+ tcp_server: Server,
+
+ const Self = @This();
+ async fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: File) void {
+ const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
+ var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592
+ defer socket.close();
+ const next_handler = errorableHandler(self, _addr, socket) catch |err| {
+ std.debug.panic("unable to handle connection: {}\n", err);
+ };
+ }
+ async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: File) !void {
+ const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/1592
+ var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592
+
+ const stream = &socket.outStream().stream;
+ try stream.print("hello from server\n");
+ }
+ };
+
+ const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
+ const addr = std.net.Address.initIp4(ip4addr, 0);
+
+ var loop: Loop = undefined;
+ try loop.initSingleThreaded(std.debug.global_allocator);
+ var server = MyServer{ .tcp_server = Server.init(&loop) };
+ defer server.tcp_server.deinit();
+ try server.tcp_server.listen(&addr, MyServer.handler);
+
+ _ = async doAsyncTest(&loop, &server.tcp_server.listen_address, &server.tcp_server);
+ loop.run();
+}
+
+async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void {
+ errdefer @panic("test failure");
+
+ var socket_file = try connect(loop, address);
+ defer socket_file.close();
+
+ var buf: [512]u8 = undefined;
+ const amt_read = try socket_file.read(buf[0..]);
+ const msg = buf[0..amt_read];
+ testing.expect(mem.eql(u8, msg, "hello from server\n"));
+ server.close();
+}
+
+pub const OutStream = struct {
+ fd: fd_t,
+ stream: Stream,
+ loop: *Loop,
+
+ pub const Error = WriteError;
+ pub const Stream = event.io.OutStream(Error);
+
+ pub fn init(loop: *Loop, fd: fd_t) OutStream {
+ return OutStream{
+ .fd = fd,
+ .loop = loop,
+ .stream = Stream{ .writeFn = writeFn },
+ };
+ }
+
+ async fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void {
+ const self = @fieldParentPtr(OutStream, "stream", out_stream);
+ return write(self.loop, self.fd, bytes);
+ }
+};
+
+pub const InStream = struct {
+ fd: fd_t,
+ stream: Stream,
+ loop: *Loop,
+
+ pub const Error = ReadError;
+ pub const Stream = event.io.InStream(Error);
+
+ pub fn init(loop: *Loop, fd: fd_t) InStream {
+ return InStream{
+ .fd = fd,
+ .loop = loop,
+ .stream = Stream{ .readFn = readFn },
+ };
+ }
+
+ async fn readFn(in_stream: *Stream, bytes: []u8) Error!usize {
+ const self = @fieldParentPtr(InStream, "stream", in_stream);
+ return read(self.loop, self.fd, bytes);
+ }
+};
diff --git a/lib/std/event/rwlock.zig b/lib/std/event/rwlock.zig
new file mode 100644
index 0000000000..bf7ea0fa9f
--- /dev/null
+++ b/lib/std/event/rwlock.zig
@@ -0,0 +1,296 @@
+const std = @import("../std.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const testing = std.testing;
+const mem = std.mem;
+const Loop = std.event.Loop;
+
+/// Thread-safe async/await lock.
+/// Functions which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+/// Many readers can hold the lock at the same time; however locking for writing is exclusive.
+/// When a read lock is held, it will not be released until the reader queue is empty.
+/// When a write lock is held, it will not be released until the writer queue is empty.
+pub const RwLock = struct {
+ loop: *Loop,
+ shared_state: u8, // TODO make this an enum
+ writer_queue: Queue,
+ reader_queue: Queue,
+ writer_queue_empty_bit: u8, // TODO make this a bool
+ reader_queue_empty_bit: u8, // TODO make this a bool
+ reader_lock_count: usize,
+
+ const State = struct {
+ const Unlocked = 0;
+ const WriteLock = 1;
+ const ReadLock = 2;
+ };
+
+ const Queue = std.atomic.Queue(anyframe);
+
+ pub const HeldRead = struct {
+ lock: *RwLock,
+
+ pub fn release(self: HeldRead) void {
+ // If other readers still hold the lock, we're done.
+ if (@atomicRmw(usize, &self.lock.reader_lock_count, .Sub, 1, .SeqCst) != 1) {
+ return;
+ }
+
+ _ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, .Xchg, 1, .SeqCst);
+ if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) {
+ // Didn't unlock. Someone else's problem.
+ return;
+ }
+
+ self.lock.commonPostUnlock();
+ }
+ };
+
+ pub const HeldWrite = struct {
+ lock: *RwLock,
+
+ pub fn release(self: HeldWrite) void {
+ // See if we can leave it locked for writing, and pass the lock to the next writer
+ // in the queue to grab the lock.
+ if (self.lock.writer_queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // We need to release the write lock. Check if any readers are waiting to grab the lock.
+ if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, .SeqCst) == 0) {
+ // Switch to a read lock.
+ _ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst);
+ while (self.lock.reader_queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ }
+ return;
+ }
+
+ _ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, .Xchg, 1, .SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.Unlocked, .SeqCst);
+
+ self.lock.commonPostUnlock();
+ }
+ };
+
+ pub fn init(loop: *Loop) RwLock {
+ return RwLock{
+ .loop = loop,
+ .shared_state = State.Unlocked,
+ .writer_queue = Queue.init(),
+ .writer_queue_empty_bit = 1,
+ .reader_queue = Queue.init(),
+ .reader_queue_empty_bit = 1,
+ .reader_lock_count = 0,
+ };
+ }
+
+ /// Must be called when not locked. Not thread safe.
+ /// All calls to acquire() and release() must complete before calling deinit().
+ pub fn deinit(self: *RwLock) void {
+ assert(self.shared_state == State.Unlocked);
+ while (self.writer_queue.get()) |node| resume node.data;
+ while (self.reader_queue.get()) |node| resume node.data;
+ }
+
+ pub async fn acquireRead(self: *RwLock) HeldRead {
+ _ = @atomicRmw(usize, &self.reader_lock_count, .Add, 1, .SeqCst);
+
+ suspend {
+ var my_tick_node = Loop.NextTickNode{
+ .data = @frame(),
+ .prev = undefined,
+ .next = undefined,
+ };
+
+ self.reader_queue.put(&my_tick_node);
+
+ // At this point, we are in the reader_queue, so we might have already been resumed.
+
+ // We set this bit so that later we can rely on the fact, that if reader_queue_empty_bit is 1,
+ // some actor will attempt to grab the lock.
+ _ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 0, .SeqCst);
+
+ // Here we don't care if we are the one to do the locking or if it was already locked for reading.
+ const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == State.ReadLock else true;
+ if (have_read_lock) {
+ // Give out all the read locks.
+ if (self.reader_queue.get()) |first_node| {
+ while (self.reader_queue.get()) |node| {
+ self.loop.onNextTick(node);
+ }
+ resume first_node.data;
+ }
+ }
+ }
+ return HeldRead{ .lock = self };
+ }
+
+ pub async fn acquireWrite(self: *RwLock) HeldWrite {
+ suspend {
+ var my_tick_node = Loop.NextTickNode{
+ .data = @frame(),
+ .prev = undefined,
+ .next = undefined,
+ };
+
+ self.writer_queue.put(&my_tick_node);
+
+ // At this point, we are in the writer_queue, so we might have already been resumed.
+
+ // We set this bit so that later we can rely on the fact, that if writer_queue_empty_bit is 1,
+ // some actor will attempt to grab the lock.
+ _ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 0, .SeqCst);
+
+ // Here we must be the one to acquire the write lock. It cannot already be locked.
+ if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) == null) {
+ // We now have a write lock.
+ if (self.writer_queue.get()) |node| {
+ // Whether this node is us or someone else, we tail resume it.
+ resume node.data;
+ }
+ }
+ }
+ return HeldWrite{ .lock = self };
+ }
+
+ fn commonPostUnlock(self: *RwLock) void {
+ while (true) {
+ // There might be a writer_queue item or a reader_queue item
+ // If we check and both are empty, we can be done, because the other actors will try to
+ // obtain the lock.
+ // But if there's a writer_queue item or a reader_queue item,
+ // we are the actor which must loop and attempt to grab the lock again.
+ if (@atomicLoad(u8, &self.writer_queue_empty_bit, .SeqCst) == 0) {
+ if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) != null) {
+ // We did not obtain the lock. Great, the queues are someone else's problem.
+ return;
+ }
+ // If there's an item in the writer queue, give them the lock, and we're done.
+ if (self.writer_queue.get()) |node| {
+ self.loop.onNextTick(node);
+ return;
+ }
+ // Release the lock again.
+ _ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 1, .SeqCst);
+ _ = @atomicRmw(u8, &self.shared_state, .Xchg, State.Unlocked, .SeqCst);
+ continue;
+ }
+
+ if (@atomicLoad(u8, &self.reader_queue_empty_bit, .SeqCst) == 0) {
+ if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst) != null) {
+ // We did not obtain the lock. Great, the queues are someone else's problem.
+ return;
+ }
+ // If there are any items in the reader queue, give out all the reader locks, and we're done.
+ if (self.reader_queue.get()) |first_node| {
+ self.loop.onNextTick(first_node);
+ while (self.reader_queue.get()) |node| {
+ self.loop.onNextTick(node);
+ }
+ return;
+ }
+ // Release the lock again.
+ _ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 1, .SeqCst);
+ if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) {
+ // Didn't unlock. Someone else's problem.
+ return;
+ }
+ continue;
+ }
+ return;
+ }
+ }
+};
+
+test "std.event.RwLock" {
+ // https://github.com/ziglang/zig/issues/2377
+ if (true) return error.SkipZigTest;
+
+ // https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+
+ const allocator = std.heap.direct_allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var lock = RwLock.init(&loop);
+ defer lock.deinit();
+
+ const handle = testLock(&loop, &lock);
+ loop.run();
+
+ const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len;
+ testing.expectEqualSlices(i32, expected_result, shared_test_data);
+}
+
+async fn testLock(loop: *Loop, lock: *RwLock) void {
+ var read_nodes: [100]Loop.NextTickNode = undefined;
+ for (read_nodes) |*read_node| {
+ const frame = loop.allocator.create(@Frame(readRunner)) catch @panic("memory");
+ read_node.data = frame;
+ frame.* = async readRunner(lock);
+ loop.onNextTick(read_node);
+ }
+
+ var write_nodes: [shared_it_count]Loop.NextTickNode = undefined;
+ for (write_nodes) |*write_node| {
+ const frame = loop.allocator.create(@Frame(writeRunner)) catch @panic("memory");
+ write_node.data = frame;
+ frame.* = async writeRunner(lock);
+ loop.onNextTick(write_node);
+ }
+
+ for (write_nodes) |*write_node| {
+ const casted = @ptrCast(*const @Frame(writeRunner), write_node.data);
+ await casted;
+ loop.allocator.destroy(casted);
+ }
+ for (read_nodes) |*read_node| {
+ const casted = @ptrCast(*const @Frame(readRunner), read_node.data);
+ await casted;
+ loop.allocator.destroy(casted);
+ }
+}
+
+const shared_it_count = 10;
+var shared_test_data = [1]i32{0} ** 10;
+var shared_test_index: usize = 0;
+var shared_count: usize = 0;
+
+async fn writeRunner(lock: *RwLock) void {
+ suspend; // resumed by onNextTick
+
+ var i: usize = 0;
+ while (i < shared_test_data.len) : (i += 1) {
+ std.time.sleep(100 * std.time.microsecond);
+ const lock_promise = async lock.acquireWrite();
+ const handle = await lock_promise;
+ defer handle.release();
+
+ shared_count += 1;
+ while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
+ shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
+ }
+ shared_test_index = 0;
+ }
+}
+
+async fn readRunner(lock: *RwLock) void {
+ suspend; // resumed by onNextTick
+ std.time.sleep(1);
+
+ var i: usize = 0;
+ while (i < shared_test_data.len) : (i += 1) {
+ const lock_promise = async lock.acquireRead();
+ const handle = await lock_promise;
+ defer handle.release();
+
+ testing.expect(shared_test_index == 0);
+ testing.expect(shared_test_data[i] == @intCast(i32, shared_count));
+ }
+}
diff --git a/lib/std/event/rwlocked.zig b/lib/std/event/rwlocked.zig
new file mode 100644
index 0000000000..386aa08407
--- /dev/null
+++ b/lib/std/event/rwlocked.zig
@@ -0,0 +1,58 @@
+const std = @import("../std.zig");
+const RwLock = std.event.RwLock;
+const Loop = std.event.Loop;
+
+/// Thread-safe async/await RW lock that protects one piece of data.
+/// Functions which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+pub fn RwLocked(comptime T: type) type {
+ return struct {
+ lock: RwLock,
+ locked_data: T,
+
+ const Self = @This();
+
+ pub const HeldReadLock = struct {
+ value: *const T,
+ held: RwLock.HeldRead,
+
+ pub fn release(self: HeldReadLock) void {
+ self.held.release();
+ }
+ };
+
+ pub const HeldWriteLock = struct {
+ value: *T,
+ held: RwLock.HeldWrite,
+
+ pub fn release(self: HeldWriteLock) void {
+ self.held.release();
+ }
+ };
+
+ pub fn init(loop: *Loop, data: T) Self {
+ return Self{
+ .lock = RwLock.init(loop),
+ .locked_data = data,
+ };
+ }
+
+ pub fn deinit(self: *Self) void {
+ self.lock.deinit();
+ }
+
+ pub async fn acquireRead(self: *Self) HeldReadLock {
+ return HeldReadLock{
+ .held = await (async self.lock.acquireRead() catch unreachable),
+ .value = &self.locked_data,
+ };
+ }
+
+ pub async fn acquireWrite(self: *Self) HeldWriteLock {
+ return HeldWriteLock{
+ .held = await (async self.lock.acquireWrite() catch unreachable),
+ .value = &self.locked_data,
+ };
+ }
+ };
+}