aboutsummaryrefslogtreecommitdiff
path: root/std
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-07-10 14:03:03 -0400
committerAndrew Kelley <superjoe30@gmail.com>2018-07-10 14:03:03 -0400
commitcfaebb20d8906d31cedc59e39e6a9286967a931a (patch)
tree33d0c3994bfb658937e2e692a946bc8a2eca4fe5 /std
parentb5d07297dec61a3993dfe91ceee2c87672db1e8e (diff)
parent0ce6934e2631eb3beca817d3bce12ecb13aafa13 (diff)
downloadzig-cfaebb20d8906d31cedc59e39e6a9286967a931a.tar.gz
zig-cfaebb20d8906d31cedc59e39e6a9286967a931a.zip
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std')
-rw-r--r--std/atomic/queue_mpsc.zig42
-rw-r--r--std/build.zig18
-rw-r--r--std/c/darwin.zig72
-rw-r--r--std/crypto/throughput_test.zig8
-rw-r--r--std/debug/index.zig80
-rw-r--r--std/event.zig536
-rw-r--r--std/event/channel.zig254
-rw-r--r--std/event/lock.zig204
-rw-r--r--std/event/locked.zig43
-rw-r--r--std/event/loop.zig629
-rw-r--r--std/event/tcp.zig183
-rw-r--r--std/hash_map.zig20
-rw-r--r--std/heap.zig99
-rw-r--r--std/mem.zig2
-rw-r--r--std/os/darwin.zig259
-rw-r--r--std/os/index.zig203
-rw-r--r--std/os/linux/index.zig12
-rw-r--r--std/os/test.zig5
-rw-r--r--std/os/windows/index.zig28
-rw-r--r--std/os/windows/util.zig47
-rw-r--r--std/special/build_runner.zig9
-rw-r--r--std/special/compiler_rt/extendXfYf2_test.zig40
-rw-r--r--std/zig/bench.zig14
23 files changed, 2185 insertions, 622 deletions
diff --git a/std/atomic/queue_mpsc.zig b/std/atomic/queue_mpsc.zig
index 8030565d7a..978e189453 100644
--- a/std/atomic/queue_mpsc.zig
+++ b/std/atomic/queue_mpsc.zig
@@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type {
pub const Node = std.atomic.Stack(T).Node;
+ /// Not thread-safe. The call to init() must complete before any other functions are called.
+ /// No deinitialization required.
pub fn init() Self {
return Self{
.inboxes = []std.atomic.Stack(T){
@@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type {
};
}
+ /// Fully thread-safe. put() may be called from any thread at any time.
pub fn put(self: *Self, node: *Node) void {
const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst);
const inbox = &self.inboxes[inbox_index];
inbox.push(node);
}
+ /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before
+ /// the next call to get().
pub fn get(self: *Self) ?*Node {
if (self.outbox.pop()) |node| {
return node;
@@ -43,6 +48,43 @@ pub fn QueueMpsc(comptime T: type) type {
}
return self.outbox.pop();
}
+
+ /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before
+ /// the next call to isEmpty().
+ pub fn isEmpty(self: *Self) bool {
+ if (!self.outbox.isEmpty()) return false;
+ const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst);
+ const prev_inbox = &self.inboxes[prev_inbox_index];
+ while (prev_inbox.pop()) |node| {
+ self.outbox.push(node);
+ }
+ return self.outbox.isEmpty();
+ }
+
+ /// For debugging only. No API guarantees about what this does.
+ pub fn dump(self: *Self) void {
+ {
+ var it = self.outbox.root;
+ while (it) |node| {
+ std.debug.warn("0x{x} -> ", @ptrToInt(node));
+ it = node.next;
+ }
+ }
+ const inbox_index = self.inbox_index;
+ const inboxes = []*std.atomic.Stack(T){
+ &self.inboxes[self.inbox_index],
+ &self.inboxes[1 - self.inbox_index],
+ };
+ for (inboxes) |inbox| {
+ var it = inbox.root;
+ while (it) |node| {
+ std.debug.warn("0x{x} -> ", @ptrToInt(node));
+ it = node.next;
+ }
+ }
+
+ std.debug.warn("null\n");
+ }
};
}
diff --git a/std/build.zig b/std/build.zig
index 99de9b5197..24fa85383a 100644
--- a/std/build.zig
+++ b/std/build.zig
@@ -814,6 +814,7 @@ pub const LibExeObjStep = struct {
out_h_filename: []const u8,
assembly_files: ArrayList([]const u8),
packages: ArrayList(Pkg),
+ build_options_contents: std.Buffer,
// C only stuff
source_files: ArrayList([]const u8),
@@ -905,6 +906,7 @@ pub const LibExeObjStep = struct {
.lib_paths = ArrayList([]const u8).init(builder.allocator),
.object_src = undefined,
.disable_libc = true,
+ .build_options_contents = std.Buffer.initSize(builder.allocator, 0) catch unreachable,
};
self.computeOutFileNames();
return self;
@@ -945,6 +947,7 @@ pub const LibExeObjStep = struct {
.out_h_filename = undefined,
.assembly_files = undefined,
.packages = undefined,
+ .build_options_contents = undefined,
};
self.computeOutFileNames();
return self;
@@ -1096,6 +1099,12 @@ pub const LibExeObjStep = struct {
self.include_dirs.append(self.builder.cache_root) catch unreachable;
}
+ pub fn addBuildOption(self: *LibExeObjStep, comptime T: type, name: []const u8, value: T) void {
+ assert(self.is_zig);
+ const out = &std.io.BufferOutStream.init(&self.build_options_contents).stream;
+ out.print("pub const {} = {};\n", name, value) catch unreachable;
+ }
+
pub fn addIncludeDir(self: *LibExeObjStep, path: []const u8) void {
self.include_dirs.append(path) catch unreachable;
}
@@ -1155,6 +1164,15 @@ pub const LibExeObjStep = struct {
zig_args.append(builder.pathFromRoot(root_src)) catch unreachable;
}
+ if (self.build_options_contents.len() > 0) {
+ const build_options_file = try os.path.join(builder.allocator, builder.cache_root, builder.fmt("{}_build_options.zig", self.name));
+ try std.io.writeFile(builder.allocator, build_options_file, self.build_options_contents.toSliceConst());
+ try zig_args.append("--pkg-begin");
+ try zig_args.append("build_options");
+ try zig_args.append(builder.pathFromRoot(build_options_file));
+ try zig_args.append("--pkg-end");
+ }
+
for (self.object_files.toSliceConst()) |object_file| {
zig_args.append("--object") catch unreachable;
zig_args.append(builder.pathFromRoot(object_file)) catch unreachable;
diff --git a/std/c/darwin.zig b/std/c/darwin.zig
index e3b53d9bea..133ef62f05 100644
--- a/std/c/darwin.zig
+++ b/std/c/darwin.zig
@@ -6,6 +6,30 @@ pub extern "c" fn __getdirentries64(fd: c_int, buf_ptr: [*]u8, buf_len: usize, b
pub extern "c" fn mach_absolute_time() u64;
pub extern "c" fn mach_timebase_info(tinfo: ?*mach_timebase_info_data) void;
+pub extern "c" fn kqueue() c_int;
+pub extern "c" fn kevent(
+ kq: c_int,
+ changelist: [*]const Kevent,
+ nchanges: c_int,
+ eventlist: [*]Kevent,
+ nevents: c_int,
+ timeout: ?*const timespec,
+) c_int;
+
+pub extern "c" fn kevent64(
+ kq: c_int,
+ changelist: [*]const kevent64_s,
+ nchanges: c_int,
+ eventlist: [*]kevent64_s,
+ nevents: c_int,
+ flags: c_uint,
+ timeout: ?*const timespec,
+) c_int;
+
+pub extern "c" fn sysctl(name: [*]c_int, namelen: c_uint, oldp: ?*c_void, oldlenp: ?*usize, newp: ?*c_void, newlen: usize) c_int;
+pub extern "c" fn sysctlbyname(name: [*]const u8, oldp: ?*c_void, oldlenp: ?*usize, newp: ?*c_void, newlen: usize) c_int;
+pub extern "c" fn sysctlnametomib(name: [*]const u8, mibp: ?*c_int, sizep: ?*usize) c_int;
+
pub use @import("../os/darwin_errno.zig");
pub const _errno = __error;
@@ -86,3 +110,51 @@ pub const pthread_attr_t = extern struct {
__sig: c_long,
__opaque: [56]u8,
};
+
+/// Renamed from `kevent` to `Kevent` to avoid conflict with function name.
+pub const Kevent = extern struct {
+ ident: usize,
+ filter: i16,
+ flags: u16,
+ fflags: u32,
+ data: isize,
+ udata: usize,
+};
+
+// sys/types.h on macos uses #pragma pack(4) so these checks are
+// to make sure the struct is laid out the same. These values were
+// produced from C code using the offsetof macro.
+const std = @import("../index.zig");
+const assert = std.debug.assert;
+
+comptime {
+ assert(@offsetOf(Kevent, "ident") == 0);
+ assert(@offsetOf(Kevent, "filter") == 8);
+ assert(@offsetOf(Kevent, "flags") == 10);
+ assert(@offsetOf(Kevent, "fflags") == 12);
+ assert(@offsetOf(Kevent, "data") == 16);
+ assert(@offsetOf(Kevent, "udata") == 24);
+}
+
+pub const kevent64_s = extern struct {
+ ident: u64,
+ filter: i16,
+ flags: u16,
+ fflags: u32,
+ data: i64,
+ udata: u64,
+ ext: [2]u64,
+};
+
+// sys/types.h on macos uses #pragma pack() so these checks are
+// to make sure the struct is laid out the same. These values were
+// produced from C code using the offsetof macro.
+comptime {
+ assert(@offsetOf(kevent64_s, "ident") == 0);
+ assert(@offsetOf(kevent64_s, "filter") == 8);
+ assert(@offsetOf(kevent64_s, "flags") == 10);
+ assert(@offsetOf(kevent64_s, "fflags") == 12);
+ assert(@offsetOf(kevent64_s, "data") == 16);
+ assert(@offsetOf(kevent64_s, "udata") == 24);
+ assert(@offsetOf(kevent64_s, "ext") == 32);
+}
diff --git a/std/crypto/throughput_test.zig b/std/crypto/throughput_test.zig
index 0ad6845d1a..c21838e607 100644
--- a/std/crypto/throughput_test.zig
+++ b/std/crypto/throughput_test.zig
@@ -15,8 +15,8 @@ const BytesToHash = 1024 * MiB;
pub fn main() !void {
var stdout_file = try std.io.getStdOut();
- var stdout_out_stream = std.io.FileOutStream.init(*stdout_file);
- const stdout = *stdout_out_stream.stream;
+ var stdout_out_stream = std.io.FileOutStream.init(&stdout_file);
+ const stdout = &stdout_out_stream.stream;
var block: [HashFunction.block_size]u8 = undefined;
std.mem.set(u8, block[0..], 0);
@@ -31,8 +31,8 @@ pub fn main() !void {
}
const end = timer.read();
- const elapsed_s = f64(end - start) / time.ns_per_s;
- const throughput = u64(BytesToHash / elapsed_s);
+ const elapsed_s = @intToFloat(f64, end - start) / time.ns_per_s;
+ const throughput = @floatToInt(u64, BytesToHash / elapsed_s);
try stdout.print("{}: {} MiB/s\n", @typeName(HashFunction), throughput / (1 * MiB));
}
diff --git a/std/debug/index.zig b/std/debug/index.zig
index 57b2dfc300..3070e0b40b 100644
--- a/std/debug/index.zig
+++ b/std/debug/index.zig
@@ -10,6 +10,12 @@ const ArrayList = std.ArrayList;
const builtin = @import("builtin");
pub const FailingAllocator = @import("failing_allocator.zig").FailingAllocator;
+pub const failing_allocator = FailingAllocator.init(global_allocator, 0);
+
+pub const runtime_safety = switch (builtin.mode) {
+ builtin.Mode.Debug, builtin.Mode.ReleaseSafe => true,
+ builtin.Mode.ReleaseFast, builtin.Mode.ReleaseSmall => false,
+};
/// Tries to write to stderr, unbuffered, and ignores any error returned.
/// Does not append a newline.
@@ -44,6 +50,12 @@ pub fn getSelfDebugInfo() !*ElfStackTrace {
}
}
+fn wantTtyColor() bool {
+ var bytes: [128]u8 = undefined;
+ const allocator = &std.heap.FixedBufferAllocator.init(bytes[0..]).allocator;
+ return if (std.os.getEnvVarOwned(allocator, "ZIG_DEBUG_COLOR")) |_| true else |_| stderr_file.isTty();
+}
+
/// Tries to print the current stack trace to stderr, unbuffered, and ignores any error returned.
pub fn dumpCurrentStackTrace(start_addr: ?usize) void {
const stderr = getStderrStream() catch return;
@@ -51,7 +63,7 @@ pub fn dumpCurrentStackTrace(start_addr: ?usize) void {
stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", @errorName(err)) catch return;
return;
};
- writeCurrentStackTrace(stderr, getDebugInfoAllocator(), debug_info, stderr_file.isTty(), start_addr) catch |err| {
+ writeCurrentStackTrace(stderr, getDebugInfoAllocator(), debug_info, wantTtyColor(), start_addr) catch |err| {
stderr.print("Unable to dump stack trace: {}\n", @errorName(err)) catch return;
return;
};
@@ -64,7 +76,7 @@ pub fn dumpStackTrace(stack_trace: *const builtin.StackTrace) void {
stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", @errorName(err)) catch return;
return;
};
- writeStackTrace(stack_trace, stderr, getDebugInfoAllocator(), debug_info, stderr_file.isTty()) catch |err| {
+ writeStackTrace(stack_trace, stderr, getDebugInfoAllocator(), debug_info, wantTtyColor()) catch |err| {
stderr.print("Unable to dump stack trace: {}\n", @errorName(err)) catch return;
return;
};
@@ -156,7 +168,7 @@ pub fn writeStackTrace(stack_trace: *const builtin.StackTrace, out_stream: var,
frame_index = (frame_index + 1) % stack_trace.instruction_addresses.len;
}) {
const return_address = stack_trace.instruction_addresses[frame_index];
- try printSourceAtAddress(debug_info, out_stream, return_address);
+ try printSourceAtAddress(debug_info, out_stream, return_address, tty_color);
}
}
@@ -189,13 +201,11 @@ pub fn writeCurrentStackTrace(out_stream: var, allocator: *mem.Allocator, debug_
}
},
}
- try printSourceAtAddress(debug_info, out_stream, return_address);
+ try printSourceAtAddress(debug_info, out_stream, return_address, tty_color);
}
}
-fn printSourceAtAddress(debug_info: *ElfStackTrace, out_stream: var, address: usize) !void {
- const ptr_hex = "0x{x}";
-
+fn printSourceAtAddress(debug_info: *ElfStackTrace, out_stream: var, address: usize, tty_color: bool) !void {
switch (builtin.os) {
builtin.Os.windows => return error.UnsupportedDebugInfo,
builtin.Os.macosx => {
@@ -209,36 +219,58 @@ fn printSourceAtAddress(debug_info: *ElfStackTrace, out_stream: var, address: us
.address = address,
};
const symbol = debug_info.symbol_table.search(address) orelse &unknown;
- try out_stream.print(WHITE ++ "{}" ++ RESET ++ ": " ++ DIM ++ ptr_hex ++ " in ??? (???)" ++ RESET ++ "\n", symbol.name, address);
+ try out_stream.print(WHITE ++ "{}" ++ RESET ++ ": " ++ DIM ++ "0x{x}" ++ " in ??? (???)" ++ RESET ++ "\n", symbol.name, address);
},
else => {
const compile_unit = findCompileUnit(debug_info, address) catch {
- try out_stream.print("???:?:?: " ++ DIM ++ ptr_hex ++ " in ??? (???)" ++ RESET ++ "\n ???\n\n", address);
+ if (tty_color) {
+ try out_stream.print("???:?:?: " ++ DIM ++ "0x{x} in ??? (???)" ++ RESET ++ "\n ???\n\n", address);
+ } else {
+ try out_stream.print("???:?:?: 0x{x} in ??? (???)\n ???\n\n", address);
+ }
return;
};
const compile_unit_name = try compile_unit.die.getAttrString(debug_info, DW.AT_name);
if (getLineNumberInfo(debug_info, compile_unit, address - 1)) |line_info| {
defer line_info.deinit();
- try out_stream.print(WHITE ++ "{}:{}:{}" ++ RESET ++ ": " ++ DIM ++ ptr_hex ++ " in ??? ({})" ++ RESET ++ "\n", line_info.file_name, line_info.line, line_info.column, address, compile_unit_name);
- if (printLineFromFile(debug_info.allocator(), out_stream, line_info)) {
- if (line_info.column == 0) {
- try out_stream.write("\n");
- } else {
- {
- var col_i: usize = 1;
- while (col_i < line_info.column) : (col_i += 1) {
- try out_stream.writeByte(' ');
+ if (tty_color) {
+ try out_stream.print(
+ WHITE ++ "{}:{}:{}" ++ RESET ++ ": " ++ DIM ++ "0x{x} in ??? ({})" ++ RESET ++ "\n",
+ line_info.file_name,
+ line_info.line,
+ line_info.column,
+ address,
+ compile_unit_name,
+ );
+ if (printLineFromFile(debug_info.allocator(), out_stream, line_info)) {
+ if (line_info.column == 0) {
+ try out_stream.write("\n");
+ } else {
+ {
+ var col_i: usize = 1;
+ while (col_i < line_info.column) : (col_i += 1) {
+ try out_stream.writeByte(' ');
+ }
}
+ try out_stream.write(GREEN ++ "^" ++ RESET ++ "\n");
}
- try out_stream.write(GREEN ++ "^" ++ RESET ++ "\n");
+ } else |err| switch (err) {
+ error.EndOfFile => {},
+ else => return err,
}
- } else |err| switch (err) {
- error.EndOfFile => {},
- else => return err,
+ } else {
+ try out_stream.print(
+ "{}:{}:{}: 0x{x} in ??? ({})\n",
+ line_info.file_name,
+ line_info.line,
+ line_info.column,
+ address,
+ compile_unit_name,
+ );
}
} else |err| switch (err) {
error.MissingDebugInfo, error.InvalidDebugInfo => {
- try out_stream.print(ptr_hex ++ " in ??? ({})\n", address, compile_unit_name);
+ try out_stream.print("0x{x} in ??? ({})\n", address, compile_unit_name);
},
else => return err,
}
@@ -1098,7 +1130,7 @@ fn readILeb128(in_stream: var) !i64 {
/// This should only be used in temporary test programs.
pub const global_allocator = &global_fixed_allocator.allocator;
-var global_fixed_allocator = std.heap.FixedBufferAllocator.init(global_allocator_mem[0..]);
+var global_fixed_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(global_allocator_mem[0..]);
var global_allocator_mem: [100 * 1024]u8 = undefined;
// TODO make thread safe
diff --git a/std/event.zig b/std/event.zig
index c6ac04a9d0..7e9928b3d7 100644
--- a/std/event.zig
+++ b/std/event.zig
@@ -1,525 +1,13 @@
-const std = @import("index.zig");
-const builtin = @import("builtin");
-const assert = std.debug.assert;
-const event = this;
-const mem = std.mem;
-const posix = std.os.posix;
-const AtomicRmwOp = builtin.AtomicRmwOp;
-const AtomicOrder = builtin.AtomicOrder;
-
-pub const TcpServer = struct {
- handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void,
-
- loop: *Loop,
- sockfd: i32,
- accept_coro: ?promise,
- listen_address: std.net.Address,
-
- waiting_for_emfile_node: PromiseNode,
-
- const PromiseNode = std.LinkedList(promise).Node;
-
- pub fn init(loop: *Loop) !TcpServer {
- const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
- errdefer std.os.close(sockfd);
-
- // TODO can't initialize handler coroutine here because we need well defined copy elision
- return TcpServer{
- .loop = loop,
- .sockfd = sockfd,
- .accept_coro = null,
- .handleRequestFn = undefined,
- .waiting_for_emfile_node = undefined,
- .listen_address = undefined,
- };
- }
-
- pub fn listen(self: *TcpServer, address: *const std.net.Address, handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void) !void {
- self.handleRequestFn = handleRequestFn;
-
- try std.os.posixBind(self.sockfd, &address.os_addr);
- try std.os.posixListen(self.sockfd, posix.SOMAXCONN);
- self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(self.sockfd));
-
- self.accept_coro = try async<self.loop.allocator> TcpServer.handler(self);
- errdefer cancel self.accept_coro.?;
-
- try self.loop.addFd(self.sockfd, self.accept_coro.?);
- errdefer self.loop.removeFd(self.sockfd);
- }
-
- pub fn deinit(self: *TcpServer) void {
- self.loop.removeFd(self.sockfd);
- if (self.accept_coro) |accept_coro| cancel accept_coro;
- std.os.close(self.sockfd);
- }
-
- pub async fn handler(self: *TcpServer) void {
- while (true) {
- var accepted_addr: std.net.Address = undefined;
- if (std.os.posixAccept(self.sockfd, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| {
- var socket = std.os.File.openHandle(accepted_fd);
- _ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) {
- error.OutOfMemory => {
- socket.close();
- continue;
- },
- };
- } else |err| switch (err) {
- error.WouldBlock => {
- suspend; // we will get resumed by epoll_wait in the event loop
- continue;
- },
- error.ProcessFdQuotaExceeded => {
- errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node);
- suspend |p| {
- self.waiting_for_emfile_node = PromiseNode.init(p);
- std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node);
- }
- continue;
- },
- error.ConnectionAborted, error.FileDescriptorClosed => continue,
-
- error.PageFault => unreachable,
- error.InvalidSyscall => unreachable,
- error.FileDescriptorNotASocket => unreachable,
- error.OperationNotSupported => unreachable,
-
- error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => {
- @panic("TODO handle this error");
- },
- }
- }
- }
-};
-
-pub const Loop = struct {
- allocator: *mem.Allocator,
- keep_running: bool,
- next_tick_queue: std.atomic.QueueMpsc(promise),
- os_data: OsData,
-
- const OsData = switch (builtin.os) {
- builtin.Os.linux => struct {
- epollfd: i32,
- },
- else => struct {},
- };
-
- pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
-
- /// The allocator must be thread-safe because we use it for multiplexing
- /// coroutines onto kernel threads.
- pub fn init(allocator: *mem.Allocator) !Loop {
- var self = Loop{
- .keep_running = true,
- .allocator = allocator,
- .os_data = undefined,
- .next_tick_queue = std.atomic.QueueMpsc(promise).init(),
- };
- try self.initOsData();
- errdefer self.deinitOsData();
-
- return self;
- }
-
- /// must call stop before deinit
- pub fn deinit(self: *Loop) void {
- self.deinitOsData();
- }
-
- const InitOsDataError = std.os.LinuxEpollCreateError;
-
- fn initOsData(self: *Loop) InitOsDataError!void {
- switch (builtin.os) {
- builtin.Os.linux => {
- self.os_data.epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC);
- errdefer std.os.close(self.os_data.epollfd);
- },
- else => {},
- }
- }
-
- fn deinitOsData(self: *Loop) void {
- switch (builtin.os) {
- builtin.Os.linux => std.os.close(self.os_data.epollfd),
- else => {},
- }
- }
-
- pub fn addFd(self: *Loop, fd: i32, prom: promise) !void {
- var ev = std.os.linux.epoll_event{
- .events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
- .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(prom) },
- };
- try std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev);
- }
-
- pub fn removeFd(self: *Loop, fd: i32) void {
- std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
- }
- async fn waitFd(self: *Loop, fd: i32) !void {
- defer self.removeFd(fd);
- suspend |p| {
- try self.addFd(fd, p);
- }
- }
-
- pub fn stop(self: *Loop) void {
- // TODO make atomic
- self.keep_running = false;
- // TODO activate an fd in the epoll set which should cancel all the promises
- }
-
- /// bring your own linked list node. this means it can't fail.
- pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
- self.next_tick_queue.put(node);
- }
-
- pub fn run(self: *Loop) void {
- while (self.keep_running) {
- // TODO multiplex the next tick queue and the epoll event results onto a thread pool
- while (self.next_tick_queue.get()) |node| {
- resume node.data;
- }
- if (!self.keep_running) break;
-
- self.dispatchOsEvents();
- }
- }
-
- fn dispatchOsEvents(self: *Loop) void {
- switch (builtin.os) {
- builtin.Os.linux => {
- var events: [16]std.os.linux.epoll_event = undefined;
- const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
- for (events[0..count]) |ev| {
- const p = @intToPtr(promise, ev.data.ptr);
- resume p;
- }
- },
- else => {},
- }
- }
-};
-
-/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
-/// when buffer is empty, consumers suspend and are resumed by producers
-/// when buffer is full, producers suspend and are resumed by consumers
-pub fn Channel(comptime T: type) type {
- return struct {
- loop: *Loop,
-
- getters: std.atomic.QueueMpsc(GetNode),
- putters: std.atomic.QueueMpsc(PutNode),
- get_count: usize,
- put_count: usize,
- dispatch_lock: u8, // TODO make this a bool
- need_dispatch: u8, // TODO make this a bool
-
- // simple fixed size ring buffer
- buffer_nodes: []T,
- buffer_index: usize,
- buffer_len: usize,
-
- const SelfChannel = this;
- const GetNode = struct {
- ptr: *T,
- tick_node: *Loop.NextTickNode,
- };
- const PutNode = struct {
- data: T,
- tick_node: *Loop.NextTickNode,
- };
-
- /// call destroy when done
- pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
- const buffer_nodes = try loop.allocator.alloc(T, capacity);
- errdefer loop.allocator.free(buffer_nodes);
-
- const self = try loop.allocator.create(SelfChannel{
- .loop = loop,
- .buffer_len = 0,
- .buffer_nodes = buffer_nodes,
- .buffer_index = 0,
- .dispatch_lock = 0,
- .need_dispatch = 0,
- .getters = std.atomic.QueueMpsc(GetNode).init(),
- .putters = std.atomic.QueueMpsc(PutNode).init(),
- .get_count = 0,
- .put_count = 0,
- });
- errdefer loop.allocator.destroy(self);
-
- return self;
- }
-
- /// must be called when all calls to put and get have suspended and no more calls occur
- pub fn destroy(self: *SelfChannel) void {
- while (self.getters.get()) |get_node| {
- cancel get_node.data.tick_node.data;
- }
- while (self.putters.get()) |put_node| {
- cancel put_node.data.tick_node.data;
- }
- self.loop.allocator.free(self.buffer_nodes);
- self.loop.allocator.destroy(self);
- }
-
- /// puts a data item in the channel. The promise completes when the value has been added to the
- /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
- pub async fn put(self: *SelfChannel, data: T) void {
- // TODO should be able to group memory allocation failure before first suspend point
- // so that the async invocation catches it
- var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
- _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
-
- suspend |handle| {
- var my_tick_node = Loop.NextTickNode{
- .next = undefined,
- .data = handle,
- };
- var queue_node = std.atomic.QueueMpsc(PutNode).Node{
- .data = PutNode{
- .tick_node = &my_tick_node,
- .data = data,
- },
- .next = undefined,
- };
- self.putters.put(&queue_node);
- _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
-
- self.loop.onNextTick(dispatch_tick_node_ptr);
- }
- }
-
- /// await this function to get an item from the channel. If the buffer is empty, the promise will
- /// complete when the next item is put in the channel.
- pub async fn get(self: *SelfChannel) T {
- // TODO should be able to group memory allocation failure before first suspend point
- // so that the async invocation catches it
- var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
- _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
-
- // TODO integrate this function with named return values
- // so we can get rid of this extra result copy
- var result: T = undefined;
- var debug_handle: usize = undefined;
- suspend |handle| {
- debug_handle = @ptrToInt(handle);
- var my_tick_node = Loop.NextTickNode{
- .next = undefined,
- .data = handle,
- };
- var queue_node = std.atomic.QueueMpsc(GetNode).Node{
- .data = GetNode{
- .ptr = &result,
- .tick_node = &my_tick_node,
- },
- .next = undefined,
- };
- self.getters.put(&queue_node);
- _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
-
- self.loop.onNextTick(dispatch_tick_node_ptr);
- }
- return result;
- }
-
- async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void {
- // resumed by onNextTick
- suspend |handle| {
- var tick_node = Loop.NextTickNode{
- .data = handle,
- .next = undefined,
- };
- tick_node_ptr.* = &tick_node;
- }
-
- // set the "need dispatch" flag
- _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
-
- lock: while (true) {
- // set the lock flag
- const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
- if (prev_lock != 0) return;
-
- // clear the need_dispatch flag since we're about to do it
- _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-
- while (true) {
- one_dispatch: {
- // later we correct these extra subtractions
- var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-
- // transfer self.buffer to self.getters
- while (self.buffer_len != 0) {
- if (get_count == 0) break :one_dispatch;
-
- const get_node = &self.getters.get().?.data;
- get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
- self.loop.onNextTick(get_node.tick_node);
- self.buffer_len -= 1;
-
- get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
-
- // direct transfer self.putters to self.getters
- while (get_count != 0 and put_count != 0) {
- const get_node = &self.getters.get().?.data;
- const put_node = &self.putters.get().?.data;
-
- get_node.ptr.* = put_node.data;
- self.loop.onNextTick(get_node.tick_node);
- self.loop.onNextTick(put_node.tick_node);
-
- get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
-
- // transfer self.putters to self.buffer
- while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
- const put_node = &self.putters.get().?.data;
-
- self.buffer_nodes[self.buffer_index] = put_node.data;
- self.loop.onNextTick(put_node.tick_node);
- self.buffer_index +%= 1;
- self.buffer_len += 1;
-
- put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
- }
-
- // undo the extra subtractions
- _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
- _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
-
- // clear need-dispatch flag
- const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- if (need_dispatch != 0) continue;
-
- const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- assert(my_lock != 0);
-
- // we have to check again now that we unlocked
- if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
-
- return;
- }
- }
- }
- };
-}
-
-pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File {
- var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733
-
- const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
- errdefer std.os.close(sockfd);
-
- try std.os.posixConnectAsync(sockfd, &address.os_addr);
- try await try async loop.waitFd(sockfd);
- try std.os.posixGetSockOptConnectError(sockfd);
-
- return std.os.File.openHandle(sockfd);
-}
-
-test "listen on a port, send bytes, receive bytes" {
- if (builtin.os != builtin.Os.linux) {
- // TODO build abstractions for other operating systems
- return;
- }
- const MyServer = struct {
- tcp_server: TcpServer,
-
- const Self = this;
- async<*mem.Allocator> fn handler(tcp_server: *TcpServer, _addr: *const std.net.Address, _socket: *const std.os.File) void {
- const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
- var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
- defer socket.close();
- const next_handler = async errorableHandler(self, _addr, socket) catch |err| switch (err) {
- error.OutOfMemory => @panic("unable to handle connection: out of memory"),
- };
- (await next_handler) catch |err| {
- std.debug.panic("unable to handle connection: {}\n", err);
- };
- suspend |p| {
- cancel p;
- }
- }
- async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: *const std.os.File) !void {
- const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/733
- var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
-
- var adapter = std.io.FileOutStream.init(&socket);
- var stream = &adapter.stream;
- try stream.print("hello from server\n");
- }
- };
-
- const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
- const addr = std.net.Address.initIp4(ip4addr, 0);
-
- var loop = try Loop.init(std.debug.global_allocator);
- var server = MyServer{ .tcp_server = try TcpServer.init(&loop) };
- defer server.tcp_server.deinit();
- try server.tcp_server.listen(addr, MyServer.handler);
-
- const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address);
- defer cancel p;
- loop.run();
-}
-
-async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void {
- errdefer @panic("test failure");
-
- var socket_file = try await try async event.connect(loop, address);
- defer socket_file.close();
-
- var buf: [512]u8 = undefined;
- const amt_read = try socket_file.read(buf[0..]);
- const msg = buf[0..amt_read];
- assert(mem.eql(u8, msg, "hello from server\n"));
- loop.stop();
-}
-
-test "std.event.Channel" {
- var da = std.heap.DirectAllocator.init();
- defer da.deinit();
-
- const allocator = &da.allocator;
-
- var loop = try Loop.init(allocator);
- defer loop.deinit();
-
- const channel = try Channel(i32).create(&loop, 0);
- defer channel.destroy();
-
- const handle = try async<allocator> testChannelGetter(&loop, channel);
- defer cancel handle;
-
- const putter = try async<allocator> testChannelPutter(channel);
- defer cancel putter;
-
- loop.run();
-}
-
-async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
- errdefer @panic("test failed");
-
- const value1_promise = try async channel.get();
- const value1 = await value1_promise;
- assert(value1 == 1234);
-
- const value2_promise = try async channel.get();
- const value2 = await value2_promise;
- assert(value2 == 4567);
-
- loop.stop();
-}
-
-async fn testChannelPutter(channel: *Channel(i32)) void {
- await (async channel.put(1234) catch @panic("out of memory"));
- await (async channel.put(4567) catch @panic("out of memory"));
+pub const Locked = @import("event/locked.zig").Locked;
+pub const Loop = @import("event/loop.zig").Loop;
+pub const Lock = @import("event/lock.zig").Lock;
+pub const tcp = @import("event/tcp.zig");
+pub const Channel = @import("event/channel.zig").Channel;
+
+test "import event tests" {
+ _ = @import("event/locked.zig");
+ _ = @import("event/loop.zig");
+ _ = @import("event/lock.zig");
+ _ = @import("event/tcp.zig");
+ _ = @import("event/channel.zig");
}
diff --git a/std/event/channel.zig b/std/event/channel.zig
new file mode 100644
index 0000000000..4b3a7177a2
--- /dev/null
+++ b/std/event/channel.zig
@@ -0,0 +1,254 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const Loop = std.event.Loop;
+
+/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
+/// when buffer is empty, consumers suspend and are resumed by producers
+/// when buffer is full, producers suspend and are resumed by consumers
+pub fn Channel(comptime T: type) type {
+ return struct {
+ loop: *Loop,
+
+ getters: std.atomic.QueueMpsc(GetNode),
+ putters: std.atomic.QueueMpsc(PutNode),
+ get_count: usize,
+ put_count: usize,
+ dispatch_lock: u8, // TODO make this a bool
+ need_dispatch: u8, // TODO make this a bool
+
+ // simple fixed size ring buffer
+ buffer_nodes: []T,
+ buffer_index: usize,
+ buffer_len: usize,
+
+ const SelfChannel = this;
+ const GetNode = struct {
+ ptr: *T,
+ tick_node: *Loop.NextTickNode,
+ };
+ const PutNode = struct {
+ data: T,
+ tick_node: *Loop.NextTickNode,
+ };
+
+ /// call destroy when done
+ pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
+ const buffer_nodes = try loop.allocator.alloc(T, capacity);
+ errdefer loop.allocator.free(buffer_nodes);
+
+ const self = try loop.allocator.create(SelfChannel{
+ .loop = loop,
+ .buffer_len = 0,
+ .buffer_nodes = buffer_nodes,
+ .buffer_index = 0,
+ .dispatch_lock = 0,
+ .need_dispatch = 0,
+ .getters = std.atomic.QueueMpsc(GetNode).init(),
+ .putters = std.atomic.QueueMpsc(PutNode).init(),
+ .get_count = 0,
+ .put_count = 0,
+ });
+ errdefer loop.allocator.destroy(self);
+
+ return self;
+ }
+
+ /// must be called when all calls to put and get have suspended and no more calls occur
+ pub fn destroy(self: *SelfChannel) void {
+ while (self.getters.get()) |get_node| {
+ cancel get_node.data.tick_node.data;
+ }
+ while (self.putters.get()) |put_node| {
+ cancel put_node.data.tick_node.data;
+ }
+ self.loop.allocator.free(self.buffer_nodes);
+ self.loop.allocator.destroy(self);
+ }
+
+ /// puts a data item in the channel. The promise completes when the value has been added to the
+ /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
+ pub async fn put(self: *SelfChannel, data: T) void {
+ // TODO should be able to group memory allocation failure before first suspend point
+ // so that the async invocation catches it
+ var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
+ _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
+
+ suspend |handle| {
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle,
+ };
+ var queue_node = std.atomic.QueueMpsc(PutNode).Node{
+ .data = PutNode{
+ .tick_node = &my_tick_node,
+ .data = data,
+ },
+ .next = undefined,
+ };
+ self.putters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ self.loop.onNextTick(dispatch_tick_node_ptr);
+ }
+ }
+
+ /// await this function to get an item from the channel. If the buffer is empty, the promise will
+ /// complete when the next item is put in the channel.
+ pub async fn get(self: *SelfChannel) T {
+ // TODO should be able to group memory allocation failure before first suspend point
+ // so that the async invocation catches it
+ var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
+ _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
+
+ // TODO integrate this function with named return values
+ // so we can get rid of this extra result copy
+ var result: T = undefined;
+ suspend |handle| {
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle,
+ };
+ var queue_node = std.atomic.QueueMpsc(GetNode).Node{
+ .data = GetNode{
+ .ptr = &result,
+ .tick_node = &my_tick_node,
+ },
+ .next = undefined,
+ };
+ self.getters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ self.loop.onNextTick(dispatch_tick_node_ptr);
+ }
+ return result;
+ }
+
+ async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void {
+ // resumed by onNextTick
+ suspend |handle| {
+ var tick_node = Loop.NextTickNode{
+ .data = handle,
+ .next = undefined,
+ };
+ tick_node_ptr.* = &tick_node;
+ }
+
+ // set the "need dispatch" flag
+ _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+
+ lock: while (true) {
+ // set the lock flag
+ const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ if (prev_lock != 0) return;
+
+ // clear the need_dispatch flag since we're about to do it
+ _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ while (true) {
+ one_dispatch: {
+ // later we correct these extra subtractions
+ var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+
+ // transfer self.buffer to self.getters
+ while (self.buffer_len != 0) {
+ if (get_count == 0) break :one_dispatch;
+
+ const get_node = &self.getters.get().?.data;
+ get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
+ self.loop.onNextTick(get_node.tick_node);
+ self.buffer_len -= 1;
+
+ get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ // direct transfer self.putters to self.getters
+ while (get_count != 0 and put_count != 0) {
+ const get_node = &self.getters.get().?.data;
+ const put_node = &self.putters.get().?.data;
+
+ get_node.ptr.* = put_node.data;
+ self.loop.onNextTick(get_node.tick_node);
+ self.loop.onNextTick(put_node.tick_node);
+
+ get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ // transfer self.putters to self.buffer
+ while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
+ const put_node = &self.putters.get().?.data;
+
+ self.buffer_nodes[self.buffer_index] = put_node.data;
+ self.loop.onNextTick(put_node.tick_node);
+ self.buffer_index +%= 1;
+ self.buffer_len += 1;
+
+ put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+
+ // undo the extra subtractions
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ // clear need-dispatch flag
+ const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ if (need_dispatch != 0) continue;
+
+ const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ assert(my_lock != 0);
+
+ // we have to check again now that we unlocked
+ if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
+
+ return;
+ }
+ }
+ }
+ };
+}
+
+test "std.event.Channel" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ // TODO make a multi threaded test
+ try loop.initSingleThreaded(allocator);
+ defer loop.deinit();
+
+ const channel = try Channel(i32).create(&loop, 0);
+ defer channel.destroy();
+
+ const handle = try async<allocator> testChannelGetter(&loop, channel);
+ defer cancel handle;
+
+ const putter = try async<allocator> testChannelPutter(channel);
+ defer cancel putter;
+
+ loop.run();
+}
+
+async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
+ errdefer @panic("test failed");
+
+ const value1_promise = try async channel.get();
+ const value1 = await value1_promise;
+ assert(value1 == 1234);
+
+ const value2_promise = try async channel.get();
+ const value2 = await value2_promise;
+ assert(value2 == 4567);
+}
+
+async fn testChannelPutter(channel: *Channel(i32)) void {
+ await (async channel.put(1234) catch @panic("out of memory"));
+ await (async channel.put(4567) catch @panic("out of memory"));
+}
+
diff --git a/std/event/lock.zig b/std/event/lock.zig
new file mode 100644
index 0000000000..2a8d5ada77
--- /dev/null
+++ b/std/event/lock.zig
@@ -0,0 +1,204 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const mem = std.mem;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const Loop = std.event.Loop;
+
+/// Thread-safe async/await lock.
+/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+pub const Lock = struct {
+ loop: *Loop,
+ shared_bit: u8, // TODO make this a bool
+ queue: Queue,
+ queue_empty_bit: u8, // TODO make this a bool
+
+ const Queue = std.atomic.QueueMpsc(promise);
+
+ pub const Held = struct {
+ lock: *Lock,
+
+ pub fn release(self: Held) void {
+ // Resume the next item from the queue.
+ if (self.lock.queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // We need to release the lock.
+ _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ // There might be a queue item. If we know the queue is empty, we can be done,
+ // because the other actor will try to obtain the lock.
+ // But if there's a queue item, we are the actor which must loop and attempt
+ // to grab the lock again.
+ if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
+ return;
+ }
+
+ while (true) {
+ const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ if (old_bit != 0) {
+ // We did not obtain the lock. Great, the queue is someone else's problem.
+ return;
+ }
+
+ // Resume the next item from the queue.
+ if (self.lock.queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // Release the lock again.
+ _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ // Find out if we can be done.
+ if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
+ return;
+ }
+ }
+ }
+ };
+
+ pub fn init(loop: *Loop) Lock {
+ return Lock{
+ .loop = loop,
+ .shared_bit = 0,
+ .queue = Queue.init(),
+ .queue_empty_bit = 1,
+ };
+ }
+
+ /// Must be called when not locked. Not thread safe.
+ /// All calls to acquire() and release() must complete before calling deinit().
+ pub fn deinit(self: *Lock) void {
+ assert(self.shared_bit == 0);
+ while (self.queue.get()) |node| cancel node.data;
+ }
+
+ pub async fn acquire(self: *Lock) Held {
+ s: suspend |handle| {
+ // TODO explicitly put this memory in the coroutine frame #1194
+ var my_tick_node = Loop.NextTickNode{
+ .data = handle,
+ .next = undefined,
+ };
+
+ self.queue.put(&my_tick_node);
+
+ // At this point, we are in the queue, so we might have already been resumed and this coroutine
+ // frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame.
+
+ // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
+ // will attempt to grab the lock.
+ _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ while (true) {
+ const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ if (old_bit != 0) {
+ // We did not obtain the lock. Trust that our queue entry will resume us, and allow
+ // suspend to complete.
+ break;
+ }
+ // We got the lock. However we might have already been resumed from the queue.
+ if (self.queue.get()) |node| {
+ // Whether this node is us or someone else, we tail resume it.
+ resume node.data;
+ break;
+ } else {
+ // We already got resumed, and there are none left in the queue, which means that
+ // we aren't even supposed to hold the lock right now.
+ _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ // There might be a queue item. If we know the queue is empty, we can be done,
+ // because the other actor will try to obtain the lock.
+ // But if there's a queue item, we are the actor which must loop and attempt
+ // to grab the lock again.
+ if (@atomicLoad(u8, &self.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
+ break;
+ } else {
+ continue;
+ }
+ }
+ unreachable;
+ }
+ }
+
+ return Held{ .lock = self };
+ }
+};
+
+test "std.event.Lock" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var lock = Lock.init(&loop);
+ defer lock.deinit();
+
+ const handle = try async<allocator> testLock(&loop, &lock);
+ defer cancel handle;
+ loop.run();
+
+ assert(mem.eql(i32, shared_test_data, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len));
+}
+
+async fn testLock(loop: *Loop, lock: *Lock) void {
+ // TODO explicitly put next tick node memory in the coroutine frame #1194
+ suspend |p| {
+ resume p;
+ }
+ const handle1 = async lockRunner(lock) catch @panic("out of memory");
+ var tick_node1 = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle1,
+ };
+ loop.onNextTick(&tick_node1);
+
+ const handle2 = async lockRunner(lock) catch @panic("out of memory");
+ var tick_node2 = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle2,
+ };
+ loop.onNextTick(&tick_node2);
+
+ const handle3 = async lockRunner(lock) catch @panic("out of memory");
+ var tick_node3 = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle3,
+ };
+ loop.onNextTick(&tick_node3);
+
+ await handle1;
+ await handle2;
+ await handle3;
+}
+
+var shared_test_data = [1]i32{0} ** 10;
+var shared_test_index: usize = 0;
+
+async fn lockRunner(lock: *Lock) void {
+ suspend; // resumed by onNextTick
+
+ var i: usize = 0;
+ while (i < shared_test_data.len) : (i += 1) {
+ const lock_promise = async lock.acquire() catch @panic("out of memory");
+ const handle = await lock_promise;
+ defer handle.release();
+
+ shared_test_index = 0;
+ while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
+ shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
+ }
+ }
+}
diff --git a/std/event/locked.zig b/std/event/locked.zig
new file mode 100644
index 0000000000..e7ad544d78
--- /dev/null
+++ b/std/event/locked.zig
@@ -0,0 +1,43 @@
+const std = @import("../index.zig");
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+
+/// Thread-safe async/await lock that protects one piece of data.
+/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+pub fn Locked(comptime T: type) type {
+ return struct {
+ lock: Lock,
+ private_data: T,
+
+ const Self = this;
+
+ pub const HeldLock = struct {
+ value: *T,
+ held: Lock.Held,
+
+ pub fn release(self: HeldLock) void {
+ self.held.release();
+ }
+ };
+
+ pub fn init(loop: *Loop, data: T) Self {
+ return Self{
+ .lock = Lock.init(loop),
+ .private_data = data,
+ };
+ }
+
+ pub fn deinit(self: *Self) void {
+ self.lock.deinit();
+ }
+
+ pub async fn acquire(self: *Self) HeldLock {
+ return HeldLock{
+ // TODO guaranteed allocation elision
+ .held = await (async self.lock.acquire() catch unreachable),
+ .value = &self.private_data,
+ };
+ }
+ };
+}
diff --git a/std/event/loop.zig b/std/event/loop.zig
new file mode 100644
index 0000000000..646f15875f
--- /dev/null
+++ b/std/event/loop.zig
@@ -0,0 +1,629 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const mem = std.mem;
+const posix = std.os.posix;
+const windows = std.os.windows;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+
+pub const Loop = struct {
+ allocator: *mem.Allocator,
+ next_tick_queue: std.atomic.QueueMpsc(promise),
+ os_data: OsData,
+ final_resume_node: ResumeNode,
+ dispatch_lock: u8, // TODO make this a bool
+ pending_event_count: usize,
+ extra_threads: []*std.os.Thread,
+
+ // pre-allocated eventfds. all permanently active.
+ // this is how we send promises to be resumed on other threads.
+ available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
+ eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
+
+ pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
+
+ pub const ResumeNode = struct {
+ id: Id,
+ handle: promise,
+
+ pub const Id = enum {
+ Basic,
+ Stop,
+ EventFd,
+ };
+
+ pub const EventFd = switch (builtin.os) {
+ builtin.Os.macosx => MacOsEventFd,
+ builtin.Os.linux => struct {
+ base: ResumeNode,
+ epoll_op: u32,
+ eventfd: i32,
+ },
+ builtin.Os.windows => struct {
+ base: ResumeNode,
+ completion_key: usize,
+ },
+ else => @compileError("unsupported OS"),
+ };
+
+ const MacOsEventFd = struct {
+ base: ResumeNode,
+ kevent: posix.Kevent,
+ };
+ };
+
+ /// After initialization, call run().
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
+ return self.initInternal(allocator, 1);
+ }
+
+ /// The allocator must be thread-safe because we use it for multiplexing
+ /// coroutines onto kernel threads.
+ /// After initialization, call run().
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
+ const core_count = try std.os.cpuCount(allocator);
+ return self.initInternal(allocator, core_count);
+ }
+
+ /// Thread count is the total thread count. The thread pool size will be
+ /// max(thread_count - 1, 0)
+ fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
+ self.* = Loop{
+ .pending_event_count = 0,
+ .allocator = allocator,
+ .os_data = undefined,
+ .next_tick_queue = std.atomic.QueueMpsc(promise).init(),
+ .dispatch_lock = 1, // start locked so threads go directly into epoll wait
+ .extra_threads = undefined,
+ .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
+ .eventfd_resume_nodes = undefined,
+ .final_resume_node = ResumeNode{
+ .id = ResumeNode.Id.Stop,
+ .handle = undefined,
+ },
+ };
+ const extra_thread_count = thread_count - 1;
+ self.eventfd_resume_nodes = try self.allocator.alloc(
+ std.atomic.Stack(ResumeNode.EventFd).Node,
+ extra_thread_count,
+ );
+ errdefer self.allocator.free(self.eventfd_resume_nodes);
+
+ self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count);
+ errdefer self.allocator.free(self.extra_threads);
+
+ try self.initOsData(extra_thread_count);
+ errdefer self.deinitOsData();
+ }
+
+ /// must call stop before deinit
+ pub fn deinit(self: *Loop) void {
+ self.deinitOsData();
+ self.allocator.free(self.extra_threads);
+ }
+
+ const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError ||
+ std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError ||
+ std.os.WindowsCreateIoCompletionPortError;
+
+ const wakeup_bytes = []u8{0x1} ** 8;
+
+ fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ errdefer {
+ while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
+ }
+ for (self.eventfd_resume_nodes) |*eventfd_node| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ .eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
+ .epoll_op = posix.EPOLL_CTL_ADD,
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ }
+
+ self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
+ errdefer std.os.close(self.os_data.epollfd);
+
+ self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK);
+ errdefer std.os.close(self.os_data.final_eventfd);
+
+ self.os_data.final_eventfd_event = posix.epoll_event{
+ .events = posix.EPOLLIN,
+ .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
+ };
+ try std.os.linuxEpollCtl(
+ self.os_data.epollfd,
+ posix.EPOLL_CTL_ADD,
+ self.os_data.final_eventfd,
+ &self.os_data.final_eventfd_event,
+ );
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ // writing 8 bytes to an eventfd cannot fail
+ std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ builtin.Os.macosx => {
+ self.os_data.kqfd = try std.os.bsdKQueue();
+ errdefer std.os.close(self.os_data.kqfd);
+
+ self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count);
+ errdefer self.allocator.free(self.os_data.kevents);
+
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ // this one is for sending events
+ .kevent = posix.Kevent{
+ .ident = i,
+ .filter = posix.EVFILT_USER,
+ .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&eventfd_node.data.base),
+ },
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent);
+ _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
+ eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE;
+ eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER;
+ // this one is for waiting for events
+ self.os_data.kevents[i] = posix.Kevent{
+ .ident = i,
+ .filter = posix.EVFILT_USER,
+ .flags = 0,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&eventfd_node.data.base),
+ };
+ }
+
+ // Pre-add so that we cannot get error.SystemResources
+ // later when we try to activate it.
+ self.os_data.final_kevent = posix.Kevent{
+ .ident = extra_thread_count,
+ .filter = posix.EVFILT_USER,
+ .flags = posix.EV_ADD | posix.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&self.final_resume_node),
+ };
+ const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+ _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
+ self.os_data.final_kevent.flags = posix.EV_ENABLE;
+ self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER;
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ builtin.Os.windows => {
+ self.os_data.extra_thread_count = extra_thread_count;
+
+ self.os_data.io_port = try std.os.windowsCreateIoCompletionPort(
+ windows.INVALID_HANDLE_VALUE,
+ null,
+ undefined,
+ undefined,
+ );
+ errdefer std.os.close(self.os_data.io_port);
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ // this one is for sending events
+ .completion_key = @ptrToInt(&eventfd_node.data.base),
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ }
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ var i: usize = 0;
+ while (i < extra_thread_index) : (i += 1) {
+ while (true) {
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+ break;
+ }
+ }
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ else => {},
+ }
+ }
+
+ fn deinitOsData(self: *Loop) void {
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ std.os.close(self.os_data.final_eventfd);
+ while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
+ std.os.close(self.os_data.epollfd);
+ self.allocator.free(self.eventfd_resume_nodes);
+ },
+ builtin.Os.macosx => {
+ self.allocator.free(self.os_data.kevents);
+ std.os.close(self.os_data.kqfd);
+ },
+ builtin.Os.windows => {
+ std.os.close(self.os_data.io_port);
+ },
+ else => {},
+ }
+ }
+
+ /// resume_node must live longer than the promise that it holds a reference to.
+ pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ errdefer {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ try self.modFd(
+ fd,
+ posix.EPOLL_CTL_ADD,
+ std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
+ resume_node,
+ );
+ }
+
+ pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void {
+ var ev = std.os.linux.epoll_event{
+ .events = events,
+ .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
+ };
+ try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev);
+ }
+
+ pub fn removeFd(self: *Loop, fd: i32) void {
+ self.removeFdNoCounter(fd);
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ fn removeFdNoCounter(self: *Loop, fd: i32) void {
+ std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
+ }
+
+ pub async fn waitFd(self: *Loop, fd: i32) !void {
+ defer self.removeFd(fd);
+ suspend |p| {
+ // TODO explicitly put this memory in the coroutine frame #1194
+ var resume_node = ResumeNode{
+ .id = ResumeNode.Id.Basic,
+ .handle = p,
+ };
+ try self.addFd(fd, &resume_node);
+ }
+ }
+
+ /// Bring your own linked list node. This means it can't fail.
+ pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ self.next_tick_queue.put(node);
+ }
+
+ pub fn run(self: *Loop) void {
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.workerRun();
+ for (self.extra_threads) |extra_thread| {
+ extra_thread.wait();
+ }
+ }
+
+ /// This is equivalent to an async call, except instead of beginning execution of the async function,
+ /// it immediately returns to the caller, and the async function is queued in the event loop. It still
+ /// returns a promise to be awaited.
+ pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) {
+ const S = struct {
+ async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType {
+ suspend |p| {
+ handle.* = p;
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = p,
+ };
+ loop.onNextTick(&my_tick_node);
+ }
+ // TODO guaranteed allocation elision for await in same func as async
+ return await (async func(args2) catch unreachable);
+ }
+ };
+ var handle: promise->@typeOf(func).ReturnType = undefined;
+ return async<self.allocator> S.asyncFunc(self, &handle, args);
+ }
+
+ fn workerRun(self: *Loop) void {
+ start_over: while (true) {
+ if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) {
+ while (self.next_tick_queue.get()) |next_tick_node| {
+ const handle = next_tick_node.data;
+ if (self.next_tick_queue.isEmpty()) {
+ // last node, just resume it
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ }
+
+ // non-last node, stick it in the epoll/kqueue set so that
+ // other threads can get to it
+ if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
+ const eventfd_node = &resume_stack_node.data;
+ eventfd_node.base.handle = handle;
+ switch (builtin.os) {
+ builtin.Os.macosx => {
+ const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+ _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ builtin.Os.linux => {
+ // the pending count is already accounted for
+ const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
+ self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ builtin.Os.windows => {
+ // this value is never dereferenced but we need it to be non-null so that
+ // the consumer code can decide whether to read the completion key.
+ // it has to do this for normal I/O, so we match that behavior here.
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ else => @compileError("unsupported OS"),
+ }
+ } else {
+ // threads are too busy, can't add another eventfd to wake one up
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ }
+ }
+
+ const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst);
+ if (pending_event_count == 0) {
+ // cause all the threads to stop
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ // writing 8 bytes to an eventfd cannot fail
+ std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ return;
+ },
+ builtin.Os.macosx => {
+ const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+ // cannot fail because we already added it and this just enables it
+ _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
+ return;
+ },
+ builtin.Os.windows => {
+ var i: usize = 0;
+ while (i < self.os_data.extra_thread_count) : (i += 1) {
+ while (true) {
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+ break;
+ }
+ }
+ return;
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ }
+
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ // only process 1 event so we don't steal from other threads
+ var events: [1]std.os.linux.epoll_event = undefined;
+ const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
+ for (events[0..count]) |ev| {
+ const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ event_fd_node.epoll_op = posix.EPOLL_CTL_MOD;
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+ },
+ builtin.Os.macosx => {
+ var eventlist: [1]posix.Kevent = undefined;
+ const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable;
+ for (eventlist[0..count]) |ev| {
+ const resume_node = @intToPtr(*ResumeNode, ev.udata);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+ },
+ builtin.Os.windows => {
+ var completion_key: usize = undefined;
+ while (true) {
+ var nbytes: windows.DWORD = undefined;
+ var overlapped: ?*windows.OVERLAPPED = undefined;
+ switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
+ std.os.WindowsWaitResult.Aborted => return,
+ std.os.WindowsWaitResult.Normal => {},
+ }
+ if (overlapped != null) break;
+ }
+ const resume_node = @intToPtr(*ResumeNode, completion_key);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+ }
+
+ const OsData = switch (builtin.os) {
+ builtin.Os.linux => struct {
+ epollfd: i32,
+ final_eventfd: i32,
+ final_eventfd_event: std.os.linux.epoll_event,
+ },
+ builtin.Os.macosx => MacOsData,
+ builtin.Os.windows => struct {
+ io_port: windows.HANDLE,
+ extra_thread_count: usize,
+ },
+ else => struct {},
+ };
+
+ const MacOsData = struct {
+ kqfd: i32,
+ final_kevent: posix.Kevent,
+ kevents: []posix.Kevent,
+ };
+};
+
+test "std.event.Loop - basic" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ loop.run();
+}
+
+test "std.event.Loop - call" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var did_it = false;
+ const handle = try loop.call(testEventLoop);
+ const handle2 = try loop.call(testEventLoop2, handle, &did_it);
+ defer cancel handle2;
+
+ loop.run();
+
+ assert(did_it);
+}
+
+async fn testEventLoop() i32 {
+ return 1234;
+}
+
+async fn testEventLoop2(h: promise->i32, did_it: *bool) void {
+ const value = await h;
+ assert(value == 1234);
+ did_it.* = true;
+}
diff --git a/std/event/tcp.zig b/std/event/tcp.zig
new file mode 100644
index 0000000000..5151ecf934
--- /dev/null
+++ b/std/event/tcp.zig
@@ -0,0 +1,183 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const event = std.event;
+const mem = std.mem;
+const posix = std.os.posix;
+const windows = std.os.windows;
+const Loop = std.event.Loop;
+
+pub const Server = struct {
+ handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void,
+
+ loop: *Loop,
+ sockfd: ?i32,
+ accept_coro: ?promise,
+ listen_address: std.net.Address,
+
+ waiting_for_emfile_node: PromiseNode,
+ listen_resume_node: event.Loop.ResumeNode,
+
+ const PromiseNode = std.LinkedList(promise).Node;
+
+ pub fn init(loop: *Loop) Server {
+ // TODO can't initialize handler coroutine here because we need well defined copy elision
+ return Server{
+ .loop = loop,
+ .sockfd = null,
+ .accept_coro = null,
+ .handleRequestFn = undefined,
+ .waiting_for_emfile_node = undefined,
+ .listen_address = undefined,
+ .listen_resume_node = event.Loop.ResumeNode{
+ .id = event.Loop.ResumeNode.Id.Basic,
+ .handle = undefined,
+ },
+ };
+ }
+
+ pub fn listen(
+ self: *Server,
+ address: *const std.net.Address,
+ handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void,
+ ) !void {
+ self.handleRequestFn = handleRequestFn;
+
+ const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
+ errdefer std.os.close(sockfd);
+ self.sockfd = sockfd;
+
+ try std.os.posixBind(sockfd, &address.os_addr);
+ try std.os.posixListen(sockfd, posix.SOMAXCONN);
+ self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(sockfd));
+
+ self.accept_coro = try async<self.loop.allocator> Server.handler(self);
+ errdefer cancel self.accept_coro.?;
+
+ self.listen_resume_node.handle = self.accept_coro.?;
+ try self.loop.addFd(sockfd, &self.listen_resume_node);
+ errdefer self.loop.removeFd(sockfd);
+ }
+
+ /// Stop listening
+ pub fn close(self: *Server) void {
+ self.loop.removeFd(self.sockfd.?);
+ std.os.close(self.sockfd.?);
+ }
+
+ pub fn deinit(self: *Server) void {
+ if (self.accept_coro) |accept_coro| cancel accept_coro;
+ if (self.sockfd) |sockfd| std.os.close(sockfd);
+ }
+
+ pub async fn handler(self: *Server) void {
+ while (true) {
+ var accepted_addr: std.net.Address = undefined;
+ if (std.os.posixAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| {
+ var socket = std.os.File.openHandle(accepted_fd);
+ _ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) {
+ error.OutOfMemory => {
+ socket.close();
+ continue;
+ },
+ };
+ } else |err| switch (err) {
+ error.WouldBlock => {
+ suspend; // we will get resumed by epoll_wait in the event loop
+ continue;
+ },
+ error.ProcessFdQuotaExceeded => {
+ errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node);
+ suspend |p| {
+ self.waiting_for_emfile_node = PromiseNode.init(p);
+ std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node);
+ }
+ continue;
+ },
+ error.ConnectionAborted, error.FileDescriptorClosed => continue,
+
+ error.PageFault => unreachable,
+ error.InvalidSyscall => unreachable,
+ error.FileDescriptorNotASocket => unreachable,
+ error.OperationNotSupported => unreachable,
+
+ error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => {
+ @panic("TODO handle this error");
+ },
+ }
+ }
+ }
+};
+
+pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File {
+ var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733
+
+ const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
+ errdefer std.os.close(sockfd);
+
+ try std.os.posixConnectAsync(sockfd, &address.os_addr);
+ try await try async loop.waitFd(sockfd);
+ try std.os.posixGetSockOptConnectError(sockfd);
+
+ return std.os.File.openHandle(sockfd);
+}
+
+test "listen on a port, send bytes, receive bytes" {
+ if (builtin.os != builtin.Os.linux) {
+ // TODO build abstractions for other operating systems
+ return;
+ }
+ const MyServer = struct {
+ tcp_server: Server,
+
+ const Self = this;
+ async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: *const std.os.File) void {
+ const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
+ var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
+ defer socket.close();
+ // TODO guarantee elision of this allocation
+ const next_handler = async errorableHandler(self, _addr, socket) catch unreachable;
+ (await next_handler) catch |err| {
+ std.debug.panic("unable to handle connection: {}\n", err);
+ };
+ suspend |p| {
+ cancel p;
+ }
+ }
+ async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: *const std.os.File) !void {
+ const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/733
+ var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
+
+ var adapter = std.io.FileOutStream.init(&socket);
+ var stream = &adapter.stream;
+ try stream.print("hello from server\n");
+ }
+ };
+
+ const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
+ const addr = std.net.Address.initIp4(ip4addr, 0);
+
+ var loop: Loop = undefined;
+ try loop.initSingleThreaded(std.debug.global_allocator);
+ var server = MyServer{ .tcp_server = Server.init(&loop) };
+ defer server.tcp_server.deinit();
+ try server.tcp_server.listen(addr, MyServer.handler);
+
+ const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address, &server.tcp_server);
+ defer cancel p;
+ loop.run();
+}
+
+async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void {
+ errdefer @panic("test failure");
+
+ var socket_file = try await try async connect(loop, address);
+ defer socket_file.close();
+
+ var buf: [512]u8 = undefined;
+ const amt_read = try socket_file.read(buf[0..]);
+ const msg = buf[0..amt_read];
+ assert(mem.eql(u8, msg, "hello from server\n"));
+ server.close();
+}
+
diff --git a/std/hash_map.zig b/std/hash_map.zig
index 3bd03d4f28..cebd5272c0 100644
--- a/std/hash_map.zig
+++ b/std/hash_map.zig
@@ -259,14 +259,14 @@ test "basic hash map usage" {
var map = HashMap(i32, i32, hash_i32, eql_i32).init(&direct_allocator.allocator);
defer map.deinit();
- assert((map.put(1, 11) catch unreachable) == null);
- assert((map.put(2, 22) catch unreachable) == null);
- assert((map.put(3, 33) catch unreachable) == null);
- assert((map.put(4, 44) catch unreachable) == null);
- assert((map.put(5, 55) catch unreachable) == null);
+ assert((try map.put(1, 11)) == null);
+ assert((try map.put(2, 22)) == null);
+ assert((try map.put(3, 33)) == null);
+ assert((try map.put(4, 44)) == null);
+ assert((try map.put(5, 55)) == null);
- assert((map.put(5, 66) catch unreachable).? == 55);
- assert((map.put(5, 55) catch unreachable).? == 66);
+ assert((try map.put(5, 66)).? == 55);
+ assert((try map.put(5, 55)).? == 66);
assert(map.contains(2));
assert(map.get(2).?.value == 22);
@@ -282,9 +282,9 @@ test "iterator hash map" {
var reset_map = HashMap(i32, i32, hash_i32, eql_i32).init(&direct_allocator.allocator);
defer reset_map.deinit();
- assert((reset_map.put(1, 11) catch unreachable) == null);
- assert((reset_map.put(2, 22) catch unreachable) == null);
- assert((reset_map.put(3, 33) catch unreachable) == null);
+ assert((try reset_map.put(1, 11)) == null);
+ assert((try reset_map.put(2, 22)) == null);
+ assert((try reset_map.put(3, 33)) == null);
var keys = []i32{
1,
diff --git a/std/heap.zig b/std/heap.zig
index 2e02733da1..ef22c8d0c5 100644
--- a/std/heap.zig
+++ b/std/heap.zig
@@ -38,7 +38,7 @@ fn cFree(self: *Allocator, old_mem: []u8) void {
}
/// This allocator makes a syscall directly for every allocation and free.
-/// TODO make this thread-safe. The windows implementation will need some atomics.
+/// Thread-safe and lock-free.
pub const DirectAllocator = struct {
allocator: Allocator,
heap_handle: ?HeapHandle,
@@ -74,34 +74,34 @@ pub const DirectAllocator = struct {
const alloc_size = if (alignment <= os.page_size) n else n + alignment;
const addr = p.mmap(null, alloc_size, p.PROT_READ | p.PROT_WRITE, p.MAP_PRIVATE | p.MAP_ANONYMOUS, -1, 0);
if (addr == p.MAP_FAILED) return error.OutOfMemory;
-
if (alloc_size == n) return @intToPtr([*]u8, addr)[0..n];
- var aligned_addr = addr & ~usize(alignment - 1);
- aligned_addr += alignment;
+ const aligned_addr = (addr & ~usize(alignment - 1)) + alignment;
- //We can unmap the unused portions of our mmap, but we must only
- // pass munmap bytes that exist outside our allocated pages or it
- // will happily eat us too
+ // We can unmap the unused portions of our mmap, but we must only
+ // pass munmap bytes that exist outside our allocated pages or it
+ // will happily eat us too.
- //Since alignment > page_size, we are by definition on a page boundry
+ // Since alignment > page_size, we are by definition on a page boundary.
const unused_start = addr;
const unused_len = aligned_addr - 1 - unused_start;
- var err = p.munmap(unused_start, unused_len);
- debug.assert(p.getErrno(err) == 0);
+ const err = p.munmap(unused_start, unused_len);
+ assert(p.getErrno(err) == 0);
- //It is impossible that there is an unoccupied page at the top of our
- // mmap.
+ // It is impossible that there is an unoccupied page at the top of our
+ // mmap.
return @intToPtr([*]u8, aligned_addr)[0..n];
},
Os.windows => {
const amt = n + alignment + @sizeOf(usize);
- const heap_handle = self.heap_handle orelse blk: {
- const hh = os.windows.HeapCreate(os.windows.HEAP_NO_SERIALIZE, amt, 0) orelse return error.OutOfMemory;
- self.heap_handle = hh;
- break :blk hh;
+ const optional_heap_handle = @atomicLoad(?HeapHandle, &self.heap_handle, builtin.AtomicOrder.SeqCst);
+ const heap_handle = optional_heap_handle orelse blk: {
+ const hh = os.windows.HeapCreate(0, amt, 0) orelse return error.OutOfMemory;
+ const other_hh = @cmpxchgStrong(?HeapHandle, &self.heap_handle, null, hh, builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) orelse break :blk hh;
+ _ = os.windows.HeapDestroy(hh);
+ break :blk other_hh.?; // can't be null because of the cmpxchg
};
const ptr = os.windows.HeapAlloc(heap_handle, 0, amt) orelse return error.OutOfMemory;
const root_addr = @ptrToInt(ptr);
@@ -361,6 +361,73 @@ pub const ThreadSafeFixedBufferAllocator = struct {
fn free(allocator: *Allocator, bytes: []u8) void {}
};
+pub fn stackFallback(comptime size: usize, fallback_allocator: *Allocator) StackFallbackAllocator(size) {
+ return StackFallbackAllocator(size){
+ .buffer = undefined,
+ .fallback_allocator = fallback_allocator,
+ .fixed_buffer_allocator = undefined,
+ .allocator = Allocator{
+ .allocFn = StackFallbackAllocator(size).alloc,
+ .reallocFn = StackFallbackAllocator(size).realloc,
+ .freeFn = StackFallbackAllocator(size).free,
+ },
+ };
+}
+
+pub fn StackFallbackAllocator(comptime size: usize) type {
+ return struct {
+ const Self = this;
+
+ buffer: [size]u8,
+ allocator: Allocator,
+ fallback_allocator: *Allocator,
+ fixed_buffer_allocator: FixedBufferAllocator,
+
+ pub fn get(self: *Self) *Allocator {
+ self.fixed_buffer_allocator = FixedBufferAllocator.init(self.buffer[0..]);
+ return &self.allocator;
+ }
+
+ fn alloc(allocator: *Allocator, n: usize, alignment: u29) ![]u8 {
+ const self = @fieldParentPtr(Self, "allocator", allocator);
+ return FixedBufferAllocator.alloc(&self.fixed_buffer_allocator.allocator, n, alignment) catch
+ self.fallback_allocator.allocFn(self.fallback_allocator, n, alignment);
+ }
+
+ fn realloc(allocator: *Allocator, old_mem: []u8, new_size: usize, alignment: u29) ![]u8 {
+ const self = @fieldParentPtr(Self, "allocator", allocator);
+ const in_buffer = @ptrToInt(old_mem.ptr) >= @ptrToInt(&self.buffer) and
+ @ptrToInt(old_mem.ptr) < @ptrToInt(&self.buffer) + self.buffer.len;
+ if (in_buffer) {
+ return FixedBufferAllocator.realloc(
+ &self.fixed_buffer_allocator.allocator,
+ old_mem,
+ new_size,
+ alignment,
+ ) catch {
+ const result = try self.fallback_allocator.allocFn(
+ self.fallback_allocator,
+ new_size,
+ alignment,
+ );
+ mem.copy(u8, result, old_mem);
+ return result;
+ };
+ }
+ return self.fallback_allocator.reallocFn(self.fallback_allocator, old_mem, new_size, alignment);
+ }
+
+ fn free(allocator: *Allocator, bytes: []u8) void {
+ const self = @fieldParentPtr(Self, "allocator", allocator);
+ const in_buffer = @ptrToInt(bytes.ptr) >= @ptrToInt(&self.buffer) and
+ @ptrToInt(bytes.ptr) < @ptrToInt(&self.buffer) + self.buffer.len;
+ if (!in_buffer) {
+ return self.fallback_allocator.freeFn(self.fallback_allocator, bytes);
+ }
+ }
+ };
+}
+
test "c_allocator" {
if (builtin.link_libc) {
var slice = c_allocator.alloc(u8, 50) catch return;
diff --git a/std/mem.zig b/std/mem.zig
index b52d3e9f68..555e1e249d 100644
--- a/std/mem.zig
+++ b/std/mem.zig
@@ -6,7 +6,7 @@ const builtin = @import("builtin");
const mem = this;
pub const Allocator = struct {
- const Error = error{OutOfMemory};
+ pub const Error = error{OutOfMemory};
/// Allocate byte_count bytes and return them in a slice, with the
/// slice's pointer aligned at least to alignment bytes.
diff --git a/std/os/darwin.zig b/std/os/darwin.zig
index 15e5608343..4134e382fc 100644
--- a/std/os/darwin.zig
+++ b/std/os/darwin.zig
@@ -264,6 +264,224 @@ pub const SIGUSR1 = 30;
/// user defined signal 2
pub const SIGUSR2 = 31;
+/// no flag value
+pub const KEVENT_FLAG_NONE = 0x000;
+
+/// immediate timeout
+pub const KEVENT_FLAG_IMMEDIATE = 0x001;
+
+/// output events only include change
+pub const KEVENT_FLAG_ERROR_EVENTS = 0x002;
+
+/// add event to kq (implies enable)
+pub const EV_ADD = 0x0001;
+
+/// delete event from kq
+pub const EV_DELETE = 0x0002;
+
+/// enable event
+pub const EV_ENABLE = 0x0004;
+
+/// disable event (not reported)
+pub const EV_DISABLE = 0x0008;
+
+/// only report one occurrence
+pub const EV_ONESHOT = 0x0010;
+
+/// clear event state after reporting
+pub const EV_CLEAR = 0x0020;
+
+/// force immediate event output
+/// ... with or without EV_ERROR
+/// ... use KEVENT_FLAG_ERROR_EVENTS
+/// on syscalls supporting flags
+pub const EV_RECEIPT = 0x0040;
+
+/// disable event after reporting
+pub const EV_DISPATCH = 0x0080;
+
+/// unique kevent per udata value
+pub const EV_UDATA_SPECIFIC = 0x0100;
+
+/// ... in combination with EV_DELETE
+/// will defer delete until udata-specific
+/// event enabled. EINPROGRESS will be
+/// returned to indicate the deferral
+pub const EV_DISPATCH2 = EV_DISPATCH | EV_UDATA_SPECIFIC;
+
+/// report that source has vanished
+/// ... only valid with EV_DISPATCH2
+pub const EV_VANISHED = 0x0200;
+
+/// reserved by system
+pub const EV_SYSFLAGS = 0xF000;
+
+/// filter-specific flag
+pub const EV_FLAG0 = 0x1000;
+
+/// filter-specific flag
+pub const EV_FLAG1 = 0x2000;
+
+/// EOF detected
+pub const EV_EOF = 0x8000;
+
+/// error, data contains errno
+pub const EV_ERROR = 0x4000;
+
+pub const EV_POLL = EV_FLAG0;
+pub const EV_OOBAND = EV_FLAG1;
+
+pub const EVFILT_READ = -1;
+pub const EVFILT_WRITE = -2;
+
+/// attached to aio requests
+pub const EVFILT_AIO = -3;
+
+/// attached to vnodes
+pub const EVFILT_VNODE = -4;
+
+/// attached to struct proc
+pub const EVFILT_PROC = -5;
+
+/// attached to struct proc
+pub const EVFILT_SIGNAL = -6;
+
+/// timers
+pub const EVFILT_TIMER = -7;
+
+/// Mach portsets
+pub const EVFILT_MACHPORT = -8;
+
+/// Filesystem events
+pub const EVFILT_FS = -9;
+
+/// User events
+pub const EVFILT_USER = -10;
+
+/// Virtual memory events
+pub const EVFILT_VM = -12;
+
+/// Exception events
+pub const EVFILT_EXCEPT = -15;
+
+pub const EVFILT_SYSCOUNT = 17;
+
+/// On input, NOTE_TRIGGER causes the event to be triggered for output.
+pub const NOTE_TRIGGER = 0x01000000;
+
+/// ignore input fflags
+pub const NOTE_FFNOP = 0x00000000;
+
+/// and fflags
+pub const NOTE_FFAND = 0x40000000;
+
+/// or fflags
+pub const NOTE_FFOR = 0x80000000;
+
+/// copy fflags
+pub const NOTE_FFCOPY = 0xc0000000;
+
+/// mask for operations
+pub const NOTE_FFCTRLMASK = 0xc0000000;
+pub const NOTE_FFLAGSMASK = 0x00ffffff;
+
+/// low water mark
+pub const NOTE_LOWAT = 0x00000001;
+
+/// OOB data
+pub const NOTE_OOB = 0x00000002;
+
+/// vnode was removed
+pub const NOTE_DELETE = 0x00000001;
+
+/// data contents changed
+pub const NOTE_WRITE = 0x00000002;
+
+/// size increased
+pub const NOTE_EXTEND = 0x00000004;
+
+/// attributes changed
+pub const NOTE_ATTRIB = 0x00000008;
+
+/// link count changed
+pub const NOTE_LINK = 0x00000010;
+
+/// vnode was renamed
+pub const NOTE_RENAME = 0x00000020;
+
+/// vnode access was revoked
+pub const NOTE_REVOKE = 0x00000040;
+
+/// No specific vnode event: to test for EVFILT_READ activation
+pub const NOTE_NONE = 0x00000080;
+
+/// vnode was unlocked by flock(2)
+pub const NOTE_FUNLOCK = 0x00000100;
+
+/// process exited
+pub const NOTE_EXIT = 0x80000000;
+
+/// process forked
+pub const NOTE_FORK = 0x40000000;
+
+/// process exec'd
+pub const NOTE_EXEC = 0x20000000;
+
+/// shared with EVFILT_SIGNAL
+pub const NOTE_SIGNAL = 0x08000000;
+
+/// exit status to be returned, valid for child process only
+pub const NOTE_EXITSTATUS = 0x04000000;
+
+/// provide details on reasons for exit
+pub const NOTE_EXIT_DETAIL = 0x02000000;
+
+/// mask for signal & exit status
+pub const NOTE_PDATAMASK = 0x000fffff;
+pub const NOTE_PCTRLMASK = (~NOTE_PDATAMASK);
+
+pub const NOTE_EXIT_DETAIL_MASK = 0x00070000;
+pub const NOTE_EXIT_DECRYPTFAIL = 0x00010000;
+pub const NOTE_EXIT_MEMORY = 0x00020000;
+pub const NOTE_EXIT_CSERROR = 0x00040000;
+
+/// will react on memory pressure
+pub const NOTE_VM_PRESSURE = 0x80000000;
+
+/// will quit on memory pressure, possibly after cleaning up dirty state
+pub const NOTE_VM_PRESSURE_TERMINATE = 0x40000000;
+
+/// will quit immediately on memory pressure
+pub const NOTE_VM_PRESSURE_SUDDEN_TERMINATE = 0x20000000;
+
+/// there was an error
+pub const NOTE_VM_ERROR = 0x10000000;
+
+/// data is seconds
+pub const NOTE_SECONDS = 0x00000001;
+
+/// data is microseconds
+pub const NOTE_USECONDS = 0x00000002;
+
+/// data is nanoseconds
+pub const NOTE_NSECONDS = 0x00000004;
+
+/// absolute timeout
+pub const NOTE_ABSOLUTE = 0x00000008;
+
+/// ext[1] holds leeway for power aware timers
+pub const NOTE_LEEWAY = 0x00000010;
+
+/// system does minimal timer coalescing
+pub const NOTE_CRITICAL = 0x00000020;
+
+/// system does maximum timer coalescing
+pub const NOTE_BACKGROUND = 0x00000040;
+pub const NOTE_MACH_CONTINUOUS_TIME = 0x00000080;
+
+/// data is mach absolute time units
+pub const NOTE_MACHTIME = 0x00000100;
+
fn wstatus(x: i32) i32 {
return x & 0o177;
}
@@ -385,6 +603,31 @@ pub fn getdirentries64(fd: i32, buf_ptr: [*]u8, buf_len: usize, basep: *i64) usi
return errnoWrap(@bitCast(isize, c.__getdirentries64(fd, buf_ptr, buf_len, basep)));
}
+pub fn kqueue() usize {
+ return errnoWrap(c.kqueue());
+}
+
+pub fn kevent(kq: i32, changelist: []const Kevent, eventlist: []Kevent, timeout: ?*const timespec) usize {
+ return errnoWrap(c.kevent(
+ kq,
+ changelist.ptr,
+ @intCast(c_int, changelist.len),
+ eventlist.ptr,
+ @intCast(c_int, eventlist.len),
+ timeout,
+ ));
+}
+
+pub fn kevent64(
+ kq: i32,
+ changelist: []const kevent64_s,
+ eventlist: []kevent64_s,
+ flags: u32,
+ timeout: ?*const timespec,
+) usize {
+ return errnoWrap(c.kevent64(kq, changelist.ptr, changelist.len, eventlist.ptr, eventlist.len, flags, timeout));
+}
+
pub fn mkdir(path: [*]const u8, mode: u32) usize {
return errnoWrap(c.mkdir(path, mode));
}
@@ -393,6 +636,18 @@ pub fn symlink(existing: [*]const u8, new: [*]const u8) usize {
return errnoWrap(c.symlink(existing, new));
}
+pub fn sysctl(name: [*]c_int, namelen: c_uint, oldp: ?*c_void, oldlenp: ?*usize, newp: ?*c_void, newlen: usize) usize {
+ return errnoWrap(c.sysctl(name, namelen, oldp, oldlenp, newp, newlen));
+}
+
+pub fn sysctlbyname(name: [*]const u8, oldp: ?*c_void, oldlenp: ?*usize, newp: ?*c_void, newlen: usize) usize {
+ return errnoWrap(c.sysctlbyname(name, oldp, oldlenp, newp, newlen));
+}
+
+pub fn sysctlnametomib(name: [*]const u8, mibp: ?*c_int, sizep: ?*usize) usize {
+ return errnoWrap(c.sysctlnametomib(name, wibp, sizep));
+}
+
pub fn rename(old: [*]const u8, new: [*]const u8) usize {
return errnoWrap(c.rename(old, new));
}
@@ -474,6 +729,10 @@ pub const dirent = c.dirent;
pub const sa_family_t = c.sa_family_t;
pub const sockaddr = c.sockaddr;
+/// Renamed from `kevent` to `Kevent` to avoid conflict with the syscall.
+pub const Kevent = c.Kevent;
+pub const kevent64_s = c.kevent64_s;
+
/// Renamed from `sigaction` to `Sigaction` to avoid conflict with the syscall.
pub const Sigaction = struct {
handler: extern fn (i32) void,
diff --git a/std/os/index.zig b/std/os/index.zig
index 52b36c351c..79b2d2ff53 100644
--- a/std/os/index.zig
+++ b/std/os/index.zig
@@ -61,6 +61,15 @@ pub const windowsLoadDll = windows_util.windowsLoadDll;
pub const windowsUnloadDll = windows_util.windowsUnloadDll;
pub const createWindowsEnvBlock = windows_util.createWindowsEnvBlock;
+pub const WindowsCreateIoCompletionPortError = windows_util.WindowsCreateIoCompletionPortError;
+pub const windowsCreateIoCompletionPort = windows_util.windowsCreateIoCompletionPort;
+
+pub const WindowsPostQueuedCompletionStatusError = windows_util.WindowsPostQueuedCompletionStatusError;
+pub const windowsPostQueuedCompletionStatus = windows_util.windowsPostQueuedCompletionStatus;
+
+pub const WindowsWaitResult = windows_util.WindowsWaitResult;
+pub const windowsGetQueuedCompletionStatus = windows_util.windowsGetQueuedCompletionStatus;
+
pub const WindowsWaitError = windows_util.WaitError;
pub const WindowsOpenError = windows_util.OpenError;
pub const WindowsWriteError = windows_util.WriteError;
@@ -544,8 +553,13 @@ pub fn getEnvPosix(key: []const u8) ?[]const u8 {
return null;
}
+pub const GetEnvVarOwnedError = error{
+ OutOfMemory,
+ EnvironmentVariableNotFound,
+};
+
/// Caller must free returned memory.
-pub fn getEnvVarOwned(allocator: *mem.Allocator, key: []const u8) ![]u8 {
+pub fn getEnvVarOwned(allocator: *mem.Allocator, key: []const u8) GetEnvVarOwnedError![]u8 {
if (is_windows) {
const key_with_null = try cstr.addNullByte(allocator, key);
defer allocator.free(key_with_null);
@@ -554,14 +568,17 @@ pub fn getEnvVarOwned(allocator: *mem.Allocator, key: []const u8) ![]u8 {
errdefer allocator.free(buf);
while (true) {
- const windows_buf_len = try math.cast(windows.DWORD, buf.len);
+ const windows_buf_len = math.cast(windows.DWORD, buf.len) catch return error.OutOfMemory;
const result = windows.GetEnvironmentVariableA(key_with_null.ptr, buf.ptr, windows_buf_len);
if (result == 0) {
const err = windows.GetLastError();
return switch (err) {
windows.ERROR.ENVVAR_NOT_FOUND => error.EnvironmentVariableNotFound,
- else => unexpectedErrorWindows(err),
+ else => {
+ _ = unexpectedErrorWindows(err);
+ return error.EnvironmentVariableNotFound;
+ },
};
}
@@ -2309,6 +2326,30 @@ pub fn linuxEpollWait(epfd: i32, events: []linux.epoll_event, timeout: i32) usiz
}
}
+pub const LinuxEventFdError = error{
+ InvalidFlagValue,
+ SystemResources,
+ ProcessFdQuotaExceeded,
+ SystemFdQuotaExceeded,
+
+ Unexpected,
+};
+
+pub fn linuxEventFd(initval: u32, flags: u32) LinuxEventFdError!i32 {
+ const rc = posix.eventfd(initval, flags);
+ const err = posix.getErrno(rc);
+ switch (err) {
+ 0 => return @intCast(i32, rc),
+ else => return unexpectedErrorPosix(err),
+
+ posix.EINVAL => return LinuxEventFdError.InvalidFlagValue,
+ posix.EMFILE => return LinuxEventFdError.ProcessFdQuotaExceeded,
+ posix.ENFILE => return LinuxEventFdError.SystemFdQuotaExceeded,
+ posix.ENODEV => return LinuxEventFdError.SystemResources,
+ posix.ENOMEM => return LinuxEventFdError.SystemResources,
+ }
+}
+
pub const PosixGetSockNameError = error{
/// Insufficient resources were available in the system to perform the operation.
SystemResources,
@@ -2568,11 +2609,17 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!*Thread
thread: Thread,
inner: Context,
};
- extern fn threadMain(arg: windows.LPVOID) windows.DWORD {
- if (@sizeOf(Context) == 0) {
- return startFn({});
- } else {
- return startFn(@ptrCast(*Context, @alignCast(@alignOf(Context), arg)).*);
+ extern fn threadMain(raw_arg: windows.LPVOID) windows.DWORD {
+ const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*;
+ switch (@typeId(@typeOf(startFn).ReturnType)) {
+ builtin.TypeId.Int => {
+ return startFn(arg);
+ },
+ builtin.TypeId.Void => {
+ startFn(arg);
+ return 0;
+ },
+ else => @compileError("expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"),
}
}
};
@@ -2605,10 +2652,17 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!*Thread
const MainFuncs = struct {
extern fn linuxThreadMain(ctx_addr: usize) u8 {
- if (@sizeOf(Context) == 0) {
- return startFn({});
- } else {
- return startFn(@intToPtr(*const Context, ctx_addr).*);
+ const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*;
+
+ switch (@typeId(@typeOf(startFn).ReturnType)) {
+ builtin.TypeId.Int => {
+ return startFn(arg);
+ },
+ builtin.TypeId.Void => {
+ startFn(arg);
+ return 0;
+ },
+ else => @compileError("expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"),
}
}
extern fn posixThreadMain(ctx: ?*c_void) ?*c_void {
@@ -2717,3 +2771,128 @@ pub fn posixFStat(fd: i32) !posix.Stat {
return stat;
}
+
+pub const CpuCountError = error{
+ OutOfMemory,
+ PermissionDenied,
+ Unexpected,
+};
+
+pub fn cpuCount(fallback_allocator: *mem.Allocator) CpuCountError!usize {
+ switch (builtin.os) {
+ builtin.Os.macosx => {
+ var count: c_int = undefined;
+ var count_len: usize = @sizeOf(c_int);
+ const rc = posix.sysctlbyname(c"hw.ncpu", @ptrCast(*c_void, &count), &count_len, null, 0);
+ const err = posix.getErrno(rc);
+ switch (err) {
+ 0 => return @intCast(usize, count),
+ posix.EFAULT => unreachable,
+ posix.EINVAL => unreachable,
+ posix.ENOMEM => return CpuCountError.OutOfMemory,
+ posix.ENOTDIR => unreachable,
+ posix.EISDIR => unreachable,
+ posix.ENOENT => unreachable,
+ posix.EPERM => unreachable,
+ else => return os.unexpectedErrorPosix(err),
+ }
+ },
+ builtin.Os.linux => {
+ const usize_count = 16;
+ const allocator = std.heap.stackFallback(usize_count * @sizeOf(usize), fallback_allocator).get();
+
+ var set = try allocator.alloc(usize, usize_count);
+ defer allocator.free(set);
+
+ while (true) {
+ const rc = posix.sched_getaffinity(0, set);
+ const err = posix.getErrno(rc);
+ switch (err) {
+ 0 => {
+ if (rc < set.len * @sizeOf(usize)) {
+ const result = set[0 .. rc / @sizeOf(usize)];
+ var sum: usize = 0;
+ for (result) |x| {
+ sum += @popCount(x);
+ }
+ return sum;
+ } else {
+ set = try allocator.realloc(usize, set, set.len * 2);
+ continue;
+ }
+ },
+ posix.EFAULT => unreachable,
+ posix.EINVAL => unreachable,
+ posix.EPERM => return CpuCountError.PermissionDenied,
+ posix.ESRCH => unreachable,
+ else => return os.unexpectedErrorPosix(err),
+ }
+ }
+ },
+ builtin.Os.windows => {
+ var system_info: windows.SYSTEM_INFO = undefined;
+ windows.GetSystemInfo(&system_info);
+ return @intCast(usize, system_info.dwNumberOfProcessors);
+ },
+ else => @compileError("unsupported OS"),
+ }
+}
+
+pub const BsdKQueueError = error{
+ /// The per-process limit on the number of open file descriptors has been reached.
+ ProcessFdQuotaExceeded,
+
+ /// The system-wide limit on the total number of open files has been reached.
+ SystemFdQuotaExceeded,
+
+ Unexpected,
+};
+
+pub fn bsdKQueue() BsdKQueueError!i32 {
+ const rc = posix.kqueue();
+ const err = posix.getErrno(rc);
+ switch (err) {
+ 0 => return @intCast(i32, rc),
+ posix.EMFILE => return BsdKQueueError.ProcessFdQuotaExceeded,
+ posix.ENFILE => return BsdKQueueError.SystemFdQuotaExceeded,
+ else => return unexpectedErrorPosix(err),
+ }
+}
+
+pub const BsdKEventError = error{
+ /// The process does not have permission to register a filter.
+ AccessDenied,
+
+ /// The event could not be found to be modified or deleted.
+ EventNotFound,
+
+ /// No memory was available to register the event.
+ SystemResources,
+
+ /// The specified process to attach to does not exist.
+ ProcessNotFound,
+};
+
+pub fn bsdKEvent(
+ kq: i32,
+ changelist: []const posix.Kevent,
+ eventlist: []posix.Kevent,
+ timeout: ?*const posix.timespec,
+) BsdKEventError!usize {
+ while (true) {
+ const rc = posix.kevent(kq, changelist, eventlist, timeout);
+ const err = posix.getErrno(rc);
+ switch (err) {
+ 0 => return rc,
+ posix.EACCES => return BsdKEventError.AccessDenied,
+ posix.EFAULT => unreachable,
+ posix.EBADF => unreachable,
+ posix.EINTR => continue,
+ posix.EINVAL => unreachable,
+ posix.ENOENT => return BsdKEventError.EventNotFound,
+ posix.ENOMEM => return BsdKEventError.SystemResources,
+ posix.ESRCH => return BsdKEventError.ProcessNotFound,
+ else => unreachable,
+ }
+ }
+}
diff --git a/std/os/linux/index.zig b/std/os/linux/index.zig
index 65aa659c82..69bc30bad0 100644
--- a/std/os/linux/index.zig
+++ b/std/os/linux/index.zig
@@ -523,6 +523,10 @@ pub const CLONE_NEWPID = 0x20000000;
pub const CLONE_NEWNET = 0x40000000;
pub const CLONE_IO = 0x80000000;
+pub const EFD_SEMAPHORE = 1;
+pub const EFD_CLOEXEC = O_CLOEXEC;
+pub const EFD_NONBLOCK = O_NONBLOCK;
+
pub const MS_RDONLY = 1;
pub const MS_NOSUID = 2;
pub const MS_NODEV = 4;
@@ -1193,6 +1197,10 @@ pub fn fremovexattr(fd: usize, name: [*]const u8) usize {
return syscall2(SYS_fremovexattr, fd, @ptrToInt(name));
}
+pub fn sched_getaffinity(pid: i32, set: []usize) usize {
+ return syscall3(SYS_sched_getaffinity, @bitCast(usize, isize(pid)), set.len * @sizeOf(usize), @ptrToInt(set.ptr));
+}
+
pub const epoll_data = packed union {
ptr: usize,
fd: i32,
@@ -1221,6 +1229,10 @@ pub fn epoll_wait(epoll_fd: i32, events: [*]epoll_event, maxevents: u32, timeout
return syscall4(SYS_epoll_wait, @intCast(usize, epoll_fd), @ptrToInt(events), @intCast(usize, maxevents), @intCast(usize, timeout));
}
+pub fn eventfd(count: u32, flags: u32) usize {
+ return syscall2(SYS_eventfd2, count, flags);
+}
+
pub fn timerfd_create(clockid: i32, flags: u32) usize {
return syscall2(SYS_timerfd_create, @intCast(usize, clockid), @intCast(usize, flags));
}
diff --git a/std/os/test.zig b/std/os/test.zig
index 5a977a569a..52e6ffdc1c 100644
--- a/std/os/test.zig
+++ b/std/os/test.zig
@@ -58,3 +58,8 @@ fn start2(ctx: *i32) u8 {
_ = @atomicRmw(i32, ctx, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
return 0;
}
+
+test "cpu count" {
+ const cpu_count = try std.os.cpuCount(a);
+ assert(cpu_count >= 1);
+}
diff --git a/std/os/windows/index.zig b/std/os/windows/index.zig
index d631c6adbf..f73b8ec261 100644
--- a/std/os/windows/index.zig
+++ b/std/os/windows/index.zig
@@ -59,6 +59,9 @@ pub extern "kernel32" stdcallcc fn CreateSymbolicLinkA(
dwFlags: DWORD,
) BOOLEAN;
+
+pub extern "kernel32" stdcallcc fn CreateIoCompletionPort(FileHandle: HANDLE, ExistingCompletionPort: ?HANDLE, CompletionKey: ULONG_PTR, NumberOfConcurrentThreads: DWORD) ?HANDLE;
+
pub extern "kernel32" stdcallcc fn CreateThread(lpThreadAttributes: ?LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: LPTHREAD_START_ROUTINE, lpParameter: ?LPVOID, dwCreationFlags: DWORD, lpThreadId: ?LPDWORD) ?HANDLE;
pub extern "kernel32" stdcallcc fn DeleteFileA(lpFileName: LPCSTR) BOOL;
@@ -106,7 +109,9 @@ pub extern "kernel32" stdcallcc fn GetFinalPathNameByHandleA(
) DWORD;
pub extern "kernel32" stdcallcc fn GetProcessHeap() ?HANDLE;
+pub extern "kernel32" stdcallcc fn GetQueuedCompletionStatus(CompletionPort: HANDLE, lpNumberOfBytesTransferred: LPDWORD, lpCompletionKey: *ULONG_PTR, lpOverlapped: *?*OVERLAPPED, dwMilliseconds: DWORD) BOOL;
+pub extern "kernel32" stdcallcc fn GetSystemInfo(lpSystemInfo: *SYSTEM_INFO) void;
pub extern "kernel32" stdcallcc fn GetSystemTimeAsFileTime(*FILETIME) void;
pub extern "kernel32" stdcallcc fn HeapCreate(flOptions: DWORD, dwInitialSize: SIZE_T, dwMaximumSize: SIZE_T) ?HANDLE;
@@ -129,6 +134,9 @@ pub extern "kernel32" stdcallcc fn MoveFileExA(
dwFlags: DWORD,
) BOOL;
+
+pub extern "kernel32" stdcallcc fn PostQueuedCompletionStatus(CompletionPort: HANDLE, dwNumberOfBytesTransferred: DWORD, dwCompletionKey: ULONG_PTR, lpOverlapped: ?*OVERLAPPED) BOOL;
+
pub extern "kernel32" stdcallcc fn QueryPerformanceCounter(lpPerformanceCount: *LARGE_INTEGER) BOOL;
pub extern "kernel32" stdcallcc fn QueryPerformanceFrequency(lpFrequency: *LARGE_INTEGER) BOOL;
@@ -204,6 +212,7 @@ pub const SIZE_T = usize;
pub const TCHAR = if (UNICODE) WCHAR else u8;
pub const UINT = c_uint;
pub const ULONG_PTR = usize;
+pub const DWORD_PTR = ULONG_PTR;
pub const UNICODE = false;
pub const WCHAR = u16;
pub const WORD = u16;
@@ -413,3 +422,22 @@ pub const FILETIME = extern struct {
dwLowDateTime: DWORD,
dwHighDateTime: DWORD,
};
+
+pub const SYSTEM_INFO = extern struct {
+ anon1: extern union {
+ dwOemId: DWORD,
+ anon2: extern struct {
+ wProcessorArchitecture: WORD,
+ wReserved: WORD,
+ },
+ },
+ dwPageSize: DWORD,
+ lpMinimumApplicationAddress: LPVOID,
+ lpMaximumApplicationAddress: LPVOID,
+ dwActiveProcessorMask: DWORD_PTR,
+ dwNumberOfProcessors: DWORD,
+ dwProcessorType: DWORD,
+ dwAllocationGranularity: DWORD,
+ wProcessorLevel: WORD,
+ wProcessorRevision: WORD,
+};
diff --git a/std/os/windows/util.zig b/std/os/windows/util.zig
index 45b205451d..b04e8efc4b 100644
--- a/std/os/windows/util.zig
+++ b/std/os/windows/util.zig
@@ -214,3 +214,50 @@ pub fn windowsFindNextFile(handle: windows.HANDLE, find_file_data: *windows.WIN3
}
return true;
}
+
+
+pub const WindowsCreateIoCompletionPortError = error {
+ Unexpected,
+};
+
+pub fn windowsCreateIoCompletionPort(file_handle: windows.HANDLE, existing_completion_port: ?windows.HANDLE, completion_key: usize, concurrent_thread_count: windows.DWORD) !windows.HANDLE {
+ const handle = windows.CreateIoCompletionPort(file_handle, existing_completion_port, completion_key, concurrent_thread_count) orelse {
+ const err = windows.GetLastError();
+ switch (err) {
+ else => return os.unexpectedErrorWindows(err),
+ }
+ };
+ return handle;
+}
+
+pub const WindowsPostQueuedCompletionStatusError = error {
+ Unexpected,
+};
+
+pub fn windowsPostQueuedCompletionStatus(completion_port: windows.HANDLE, bytes_transferred_count: windows.DWORD, completion_key: usize, lpOverlapped: ?*windows.OVERLAPPED) WindowsPostQueuedCompletionStatusError!void {
+ if (windows.PostQueuedCompletionStatus(completion_port, bytes_transferred_count, completion_key, lpOverlapped) == 0) {
+ const err = windows.GetLastError();
+ switch (err) {
+ else => return os.unexpectedErrorWindows(err),
+ }
+ }
+}
+
+pub const WindowsWaitResult = error {
+ Normal,
+ Aborted,
+};
+
+pub fn windowsGetQueuedCompletionStatus(completion_port: windows.HANDLE, bytes_transferred_count: *windows.DWORD, lpCompletionKey: *usize, lpOverlapped: *?*windows.OVERLAPPED, dwMilliseconds: windows.DWORD) WindowsWaitResult {
+ if (windows.GetQueuedCompletionStatus(completion_port, bytes_transferred_count, lpCompletionKey, lpOverlapped, dwMilliseconds) == windows.FALSE) {
+ if (std.debug.runtime_safety) {
+ const err = windows.GetLastError();
+ if (err != windows.ERROR.ABANDONED_WAIT_0) {
+ std.debug.warn("err: {}\n", err);
+ }
+ assert(err == windows.ERROR.ABANDONED_WAIT_0);
+ }
+ return WindowsWaitResult.Aborted;
+ }
+ return WindowsWaitResult.Normal;
+}
diff --git a/std/special/build_runner.zig b/std/special/build_runner.zig
index e4f04df6d0..2f073b3e98 100644
--- a/std/special/build_runner.zig
+++ b/std/special/build_runner.zig
@@ -122,10 +122,13 @@ pub fn main() !void {
return usageAndErr(&builder, true, try stderr_stream);
builder.make(targets.toSliceConst()) catch |err| {
- if (err == error.InvalidStepName) {
- return usageAndErr(&builder, true, try stderr_stream);
+ switch (err) {
+ error.InvalidStepName => {
+ return usageAndErr(&builder, true, try stderr_stream);
+ },
+ error.UncleanExit => os.exit(1),
+ else => return err,
}
- return err;
};
}
diff --git a/std/special/compiler_rt/extendXfYf2_test.zig b/std/special/compiler_rt/extendXfYf2_test.zig
index 185c83a0ef..9969607011 100644
--- a/std/special/compiler_rt/extendXfYf2_test.zig
+++ b/std/special/compiler_rt/extendXfYf2_test.zig
@@ -31,7 +31,7 @@ fn test__extendhfsf2(a: u16, expected: u32) void {
if (rep == expected) {
if (rep & 0x7fffffff > 0x7f800000) {
- return; // NaN is always unequal.
+ return; // NaN is always unequal.
}
if (x == @bitCast(f32, expected)) {
return;
@@ -86,33 +86,33 @@ test "extenddftf2" {
}
test "extendhfsf2" {
- test__extendhfsf2(0x7e00, 0x7fc00000); // qNaN
- test__extendhfsf2(0x7f00, 0x7fe00000); // sNaN
- test__extendhfsf2(0x7c01, 0x7f802000); // sNaN
+ test__extendhfsf2(0x7e00, 0x7fc00000); // qNaN
+ test__extendhfsf2(0x7f00, 0x7fe00000); // sNaN
+ test__extendhfsf2(0x7c01, 0x7f802000); // sNaN
- test__extendhfsf2(0, 0); // 0
- test__extendhfsf2(0x8000, 0x80000000); // -0
+ test__extendhfsf2(0, 0); // 0
+ test__extendhfsf2(0x8000, 0x80000000); // -0
- test__extendhfsf2(0x7c00, 0x7f800000); // inf
- test__extendhfsf2(0xfc00, 0xff800000); // -inf
+ test__extendhfsf2(0x7c00, 0x7f800000); // inf
+ test__extendhfsf2(0xfc00, 0xff800000); // -inf
- test__extendhfsf2(0x0001, 0x33800000); // denormal (min), 2**-24
- test__extendhfsf2(0x8001, 0xb3800000); // denormal (min), -2**-24
+ test__extendhfsf2(0x0001, 0x33800000); // denormal (min), 2**-24
+ test__extendhfsf2(0x8001, 0xb3800000); // denormal (min), -2**-24
- test__extendhfsf2(0x03ff, 0x387fc000); // denormal (max), 2**-14 - 2**-24
- test__extendhfsf2(0x83ff, 0xb87fc000); // denormal (max), -2**-14 + 2**-24
+ test__extendhfsf2(0x03ff, 0x387fc000); // denormal (max), 2**-14 - 2**-24
+ test__extendhfsf2(0x83ff, 0xb87fc000); // denormal (max), -2**-14 + 2**-24
- test__extendhfsf2(0x0400, 0x38800000); // normal (min), 2**-14
- test__extendhfsf2(0x8400, 0xb8800000); // normal (min), -2**-14
+ test__extendhfsf2(0x0400, 0x38800000); // normal (min), 2**-14
+ test__extendhfsf2(0x8400, 0xb8800000); // normal (min), -2**-14
- test__extendhfsf2(0x7bff, 0x477fe000); // normal (max), 65504
- test__extendhfsf2(0xfbff, 0xc77fe000); // normal (max), -65504
+ test__extendhfsf2(0x7bff, 0x477fe000); // normal (max), 65504
+ test__extendhfsf2(0xfbff, 0xc77fe000); // normal (max), -65504
- test__extendhfsf2(0x3c01, 0x3f802000); // normal, 1 + 2**-10
- test__extendhfsf2(0xbc01, 0xbf802000); // normal, -1 - 2**-10
+ test__extendhfsf2(0x3c01, 0x3f802000); // normal, 1 + 2**-10
+ test__extendhfsf2(0xbc01, 0xbf802000); // normal, -1 - 2**-10
- test__extendhfsf2(0x3555, 0x3eaaa000); // normal, approx. 1/3
- test__extendhfsf2(0xb555, 0xbeaaa000); // normal, approx. -1/3
+ test__extendhfsf2(0x3555, 0x3eaaa000); // normal, approx. 1/3
+ test__extendhfsf2(0xb555, 0xbeaaa000); // normal, approx. -1/3
}
test "extendsftf2" {
diff --git a/std/zig/bench.zig b/std/zig/bench.zig
index 59392889a6..630f6b2233 100644
--- a/std/zig/bench.zig
+++ b/std/zig/bench.zig
@@ -19,20 +19,18 @@ pub fn main() !void {
}
const end = timer.read();
memory_used /= iterations;
- const elapsed_s = f64(end - start) / std.os.time.ns_per_s;
- const bytes_per_sec = f64(source.len * iterations) / elapsed_s;
+ const elapsed_s = @intToFloat(f64, end - start) / std.os.time.ns_per_s;
+ const bytes_per_sec = @intToFloat(f64, source.len * iterations) / elapsed_s;
const mb_per_sec = bytes_per_sec / (1024 * 1024);
var stdout_file = try std.io.getStdOut();
- const stdout = *std.io.FileOutStream.init(*stdout_file).stream;
- try stdout.print("{.3} MB/s, {} KB used \n", mb_per_sec, memory_used / 1024);
+ const stdout = &std.io.FileOutStream.init(&stdout_file).stream;
+ try stdout.print("{.3} MiB/s, {} KiB used \n", mb_per_sec, memory_used / 1024);
}
fn testOnce() usize {
var fixed_buf_alloc = std.heap.FixedBufferAllocator.init(fixed_buffer_mem[0..]);
- var allocator = *fixed_buf_alloc.allocator;
- var tokenizer = Tokenizer.init(source);
- var parser = Parser.init(*tokenizer, allocator, "(memory buffer)");
- _ = parser.parse() catch @panic("parse failure");
+ var allocator = &fixed_buf_alloc.allocator;
+ _ = std.zig.parse(allocator, source) catch @panic("parse failure");
return fixed_buf_alloc.end_index;
}