diff options
| author | Andrew Kelley <superjoe30@gmail.com> | 2018-07-10 14:03:03 -0400 |
|---|---|---|
| committer | Andrew Kelley <superjoe30@gmail.com> | 2018-07-10 14:03:03 -0400 |
| commit | cfaebb20d8906d31cedc59e39e6a9286967a931a (patch) | |
| tree | 33d0c3994bfb658937e2e692a946bc8a2eca4fe5 /std | |
| parent | b5d07297dec61a3993dfe91ceee2c87672db1e8e (diff) | |
| parent | 0ce6934e2631eb3beca817d3bce12ecb13aafa13 (diff) | |
| download | zig-cfaebb20d8906d31cedc59e39e6a9286967a931a.tar.gz zig-cfaebb20d8906d31cedc59e39e6a9286967a931a.zip | |
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std')
| -rw-r--r-- | std/atomic/queue_mpsc.zig | 42 | ||||
| -rw-r--r-- | std/build.zig | 18 | ||||
| -rw-r--r-- | std/c/darwin.zig | 72 | ||||
| -rw-r--r-- | std/crypto/throughput_test.zig | 8 | ||||
| -rw-r--r-- | std/debug/index.zig | 80 | ||||
| -rw-r--r-- | std/event.zig | 536 | ||||
| -rw-r--r-- | std/event/channel.zig | 254 | ||||
| -rw-r--r-- | std/event/lock.zig | 204 | ||||
| -rw-r--r-- | std/event/locked.zig | 43 | ||||
| -rw-r--r-- | std/event/loop.zig | 629 | ||||
| -rw-r--r-- | std/event/tcp.zig | 183 | ||||
| -rw-r--r-- | std/hash_map.zig | 20 | ||||
| -rw-r--r-- | std/heap.zig | 99 | ||||
| -rw-r--r-- | std/mem.zig | 2 | ||||
| -rw-r--r-- | std/os/darwin.zig | 259 | ||||
| -rw-r--r-- | std/os/index.zig | 203 | ||||
| -rw-r--r-- | std/os/linux/index.zig | 12 | ||||
| -rw-r--r-- | std/os/test.zig | 5 | ||||
| -rw-r--r-- | std/os/windows/index.zig | 28 | ||||
| -rw-r--r-- | std/os/windows/util.zig | 47 | ||||
| -rw-r--r-- | std/special/build_runner.zig | 9 | ||||
| -rw-r--r-- | std/special/compiler_rt/extendXfYf2_test.zig | 40 | ||||
| -rw-r--r-- | std/zig/bench.zig | 14 |
23 files changed, 2185 insertions, 622 deletions
diff --git a/std/atomic/queue_mpsc.zig b/std/atomic/queue_mpsc.zig index 8030565d7a..978e189453 100644 --- a/std/atomic/queue_mpsc.zig +++ b/std/atomic/queue_mpsc.zig @@ -15,6 +15,8 @@ pub fn QueueMpsc(comptime T: type) type { pub const Node = std.atomic.Stack(T).Node; + /// Not thread-safe. The call to init() must complete before any other functions are called. + /// No deinitialization required. pub fn init() Self { return Self{ .inboxes = []std.atomic.Stack(T){ @@ -26,12 +28,15 @@ pub fn QueueMpsc(comptime T: type) type { }; } + /// Fully thread-safe. put() may be called from any thread at any time. pub fn put(self: *Self, node: *Node) void { const inbox_index = @atomicLoad(usize, &self.inbox_index, AtomicOrder.SeqCst); const inbox = &self.inboxes[inbox_index]; inbox.push(node); } + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to get(). pub fn get(self: *Self) ?*Node { if (self.outbox.pop()) |node| { return node; @@ -43,6 +48,43 @@ pub fn QueueMpsc(comptime T: type) type { } return self.outbox.pop(); } + + /// Must be called by only 1 consumer at a time. Every call to get() and isEmpty() must complete before + /// the next call to isEmpty(). + pub fn isEmpty(self: *Self) bool { + if (!self.outbox.isEmpty()) return false; + const prev_inbox_index = @atomicRmw(usize, &self.inbox_index, AtomicRmwOp.Xor, 0x1, AtomicOrder.SeqCst); + const prev_inbox = &self.inboxes[prev_inbox_index]; + while (prev_inbox.pop()) |node| { + self.outbox.push(node); + } + return self.outbox.isEmpty(); + } + + /// For debugging only. No API guarantees about what this does. + pub fn dump(self: *Self) void { + { + var it = self.outbox.root; + while (it) |node| { + std.debug.warn("0x{x} -> ", @ptrToInt(node)); + it = node.next; + } + } + const inbox_index = self.inbox_index; + const inboxes = []*std.atomic.Stack(T){ + &self.inboxes[self.inbox_index], + &self.inboxes[1 - self.inbox_index], + }; + for (inboxes) |inbox| { + var it = inbox.root; + while (it) |node| { + std.debug.warn("0x{x} -> ", @ptrToInt(node)); + it = node.next; + } + } + + std.debug.warn("null\n"); + } }; } diff --git a/std/build.zig b/std/build.zig index 99de9b5197..24fa85383a 100644 --- a/std/build.zig +++ b/std/build.zig @@ -814,6 +814,7 @@ pub const LibExeObjStep = struct { out_h_filename: []const u8, assembly_files: ArrayList([]const u8), packages: ArrayList(Pkg), + build_options_contents: std.Buffer, // C only stuff source_files: ArrayList([]const u8), @@ -905,6 +906,7 @@ pub const LibExeObjStep = struct { .lib_paths = ArrayList([]const u8).init(builder.allocator), .object_src = undefined, .disable_libc = true, + .build_options_contents = std.Buffer.initSize(builder.allocator, 0) catch unreachable, }; self.computeOutFileNames(); return self; @@ -945,6 +947,7 @@ pub const LibExeObjStep = struct { .out_h_filename = undefined, .assembly_files = undefined, .packages = undefined, + .build_options_contents = undefined, }; self.computeOutFileNames(); return self; @@ -1096,6 +1099,12 @@ pub const LibExeObjStep = struct { self.include_dirs.append(self.builder.cache_root) catch unreachable; } + pub fn addBuildOption(self: *LibExeObjStep, comptime T: type, name: []const u8, value: T) void { + assert(self.is_zig); + const out = &std.io.BufferOutStream.init(&self.build_options_contents).stream; + out.print("pub const {} = {};\n", name, value) catch unreachable; + } + pub fn addIncludeDir(self: *LibExeObjStep, path: []const u8) void { self.include_dirs.append(path) catch unreachable; } @@ -1155,6 +1164,15 @@ pub const LibExeObjStep = struct { zig_args.append(builder.pathFromRoot(root_src)) catch unreachable; } + if (self.build_options_contents.len() > 0) { + const build_options_file = try os.path.join(builder.allocator, builder.cache_root, builder.fmt("{}_build_options.zig", self.name)); + try std.io.writeFile(builder.allocator, build_options_file, self.build_options_contents.toSliceConst()); + try zig_args.append("--pkg-begin"); + try zig_args.append("build_options"); + try zig_args.append(builder.pathFromRoot(build_options_file)); + try zig_args.append("--pkg-end"); + } + for (self.object_files.toSliceConst()) |object_file| { zig_args.append("--object") catch unreachable; zig_args.append(builder.pathFromRoot(object_file)) catch unreachable; diff --git a/std/c/darwin.zig b/std/c/darwin.zig index e3b53d9bea..133ef62f05 100644 --- a/std/c/darwin.zig +++ b/std/c/darwin.zig @@ -6,6 +6,30 @@ pub extern "c" fn __getdirentries64(fd: c_int, buf_ptr: [*]u8, buf_len: usize, b pub extern "c" fn mach_absolute_time() u64; pub extern "c" fn mach_timebase_info(tinfo: ?*mach_timebase_info_data) void; +pub extern "c" fn kqueue() c_int; +pub extern "c" fn kevent( + kq: c_int, + changelist: [*]const Kevent, + nchanges: c_int, + eventlist: [*]Kevent, + nevents: c_int, + timeout: ?*const timespec, +) c_int; + +pub extern "c" fn kevent64( + kq: c_int, + changelist: [*]const kevent64_s, + nchanges: c_int, + eventlist: [*]kevent64_s, + nevents: c_int, + flags: c_uint, + timeout: ?*const timespec, +) c_int; + +pub extern "c" fn sysctl(name: [*]c_int, namelen: c_uint, oldp: ?*c_void, oldlenp: ?*usize, newp: ?*c_void, newlen: usize) c_int; +pub extern "c" fn sysctlbyname(name: [*]const u8, oldp: ?*c_void, oldlenp: ?*usize, newp: ?*c_void, newlen: usize) c_int; +pub extern "c" fn sysctlnametomib(name: [*]const u8, mibp: ?*c_int, sizep: ?*usize) c_int; + pub use @import("../os/darwin_errno.zig"); pub const _errno = __error; @@ -86,3 +110,51 @@ pub const pthread_attr_t = extern struct { __sig: c_long, __opaque: [56]u8, }; + +/// Renamed from `kevent` to `Kevent` to avoid conflict with function name. +pub const Kevent = extern struct { + ident: usize, + filter: i16, + flags: u16, + fflags: u32, + data: isize, + udata: usize, +}; + +// sys/types.h on macos uses #pragma pack(4) so these checks are +// to make sure the struct is laid out the same. These values were +// produced from C code using the offsetof macro. +const std = @import("../index.zig"); +const assert = std.debug.assert; + +comptime { + assert(@offsetOf(Kevent, "ident") == 0); + assert(@offsetOf(Kevent, "filter") == 8); + assert(@offsetOf(Kevent, "flags") == 10); + assert(@offsetOf(Kevent, "fflags") == 12); + assert(@offsetOf(Kevent, "data") == 16); + assert(@offsetOf(Kevent, "udata") == 24); +} + +pub const kevent64_s = extern struct { + ident: u64, + filter: i16, + flags: u16, + fflags: u32, + data: i64, + udata: u64, + ext: [2]u64, +}; + +// sys/types.h on macos uses #pragma pack() so these checks are +// to make sure the struct is laid out the same. These values were +// produced from C code using the offsetof macro. +comptime { + assert(@offsetOf(kevent64_s, "ident") == 0); + assert(@offsetOf(kevent64_s, "filter") == 8); + assert(@offsetOf(kevent64_s, "flags") == 10); + assert(@offsetOf(kevent64_s, "fflags") == 12); + assert(@offsetOf(kevent64_s, "data") == 16); + assert(@offsetOf(kevent64_s, "udata") == 24); + assert(@offsetOf(kevent64_s, "ext") == 32); +} diff --git a/std/crypto/throughput_test.zig b/std/crypto/throughput_test.zig index 0ad6845d1a..c21838e607 100644 --- a/std/crypto/throughput_test.zig +++ b/std/crypto/throughput_test.zig @@ -15,8 +15,8 @@ const BytesToHash = 1024 * MiB; pub fn main() !void { var stdout_file = try std.io.getStdOut(); - var stdout_out_stream = std.io.FileOutStream.init(*stdout_file); - const stdout = *stdout_out_stream.stream; + var stdout_out_stream = std.io.FileOutStream.init(&stdout_file); + const stdout = &stdout_out_stream.stream; var block: [HashFunction.block_size]u8 = undefined; std.mem.set(u8, block[0..], 0); @@ -31,8 +31,8 @@ pub fn main() !void { } const end = timer.read(); - const elapsed_s = f64(end - start) / time.ns_per_s; - const throughput = u64(BytesToHash / elapsed_s); + const elapsed_s = @intToFloat(f64, end - start) / time.ns_per_s; + const throughput = @floatToInt(u64, BytesToHash / elapsed_s); try stdout.print("{}: {} MiB/s\n", @typeName(HashFunction), throughput / (1 * MiB)); } diff --git a/std/debug/index.zig b/std/debug/index.zig index 57b2dfc300..3070e0b40b 100644 --- a/std/debug/index.zig +++ b/std/debug/index.zig @@ -10,6 +10,12 @@ const ArrayList = std.ArrayList; const builtin = @import("builtin"); pub const FailingAllocator = @import("failing_allocator.zig").FailingAllocator; +pub const failing_allocator = FailingAllocator.init(global_allocator, 0); + +pub const runtime_safety = switch (builtin.mode) { + builtin.Mode.Debug, builtin.Mode.ReleaseSafe => true, + builtin.Mode.ReleaseFast, builtin.Mode.ReleaseSmall => false, +}; /// Tries to write to stderr, unbuffered, and ignores any error returned. /// Does not append a newline. @@ -44,6 +50,12 @@ pub fn getSelfDebugInfo() !*ElfStackTrace { } } +fn wantTtyColor() bool { + var bytes: [128]u8 = undefined; + const allocator = &std.heap.FixedBufferAllocator.init(bytes[0..]).allocator; + return if (std.os.getEnvVarOwned(allocator, "ZIG_DEBUG_COLOR")) |_| true else |_| stderr_file.isTty(); +} + /// Tries to print the current stack trace to stderr, unbuffered, and ignores any error returned. pub fn dumpCurrentStackTrace(start_addr: ?usize) void { const stderr = getStderrStream() catch return; @@ -51,7 +63,7 @@ pub fn dumpCurrentStackTrace(start_addr: ?usize) void { stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", @errorName(err)) catch return; return; }; - writeCurrentStackTrace(stderr, getDebugInfoAllocator(), debug_info, stderr_file.isTty(), start_addr) catch |err| { + writeCurrentStackTrace(stderr, getDebugInfoAllocator(), debug_info, wantTtyColor(), start_addr) catch |err| { stderr.print("Unable to dump stack trace: {}\n", @errorName(err)) catch return; return; }; @@ -64,7 +76,7 @@ pub fn dumpStackTrace(stack_trace: *const builtin.StackTrace) void { stderr.print("Unable to dump stack trace: Unable to open debug info: {}\n", @errorName(err)) catch return; return; }; - writeStackTrace(stack_trace, stderr, getDebugInfoAllocator(), debug_info, stderr_file.isTty()) catch |err| { + writeStackTrace(stack_trace, stderr, getDebugInfoAllocator(), debug_info, wantTtyColor()) catch |err| { stderr.print("Unable to dump stack trace: {}\n", @errorName(err)) catch return; return; }; @@ -156,7 +168,7 @@ pub fn writeStackTrace(stack_trace: *const builtin.StackTrace, out_stream: var, frame_index = (frame_index + 1) % stack_trace.instruction_addresses.len; }) { const return_address = stack_trace.instruction_addresses[frame_index]; - try printSourceAtAddress(debug_info, out_stream, return_address); + try printSourceAtAddress(debug_info, out_stream, return_address, tty_color); } } @@ -189,13 +201,11 @@ pub fn writeCurrentStackTrace(out_stream: var, allocator: *mem.Allocator, debug_ } }, } - try printSourceAtAddress(debug_info, out_stream, return_address); + try printSourceAtAddress(debug_info, out_stream, return_address, tty_color); } } -fn printSourceAtAddress(debug_info: *ElfStackTrace, out_stream: var, address: usize) !void { - const ptr_hex = "0x{x}"; - +fn printSourceAtAddress(debug_info: *ElfStackTrace, out_stream: var, address: usize, tty_color: bool) !void { switch (builtin.os) { builtin.Os.windows => return error.UnsupportedDebugInfo, builtin.Os.macosx => { @@ -209,36 +219,58 @@ fn printSourceAtAddress(debug_info: *ElfStackTrace, out_stream: var, address: us .address = address, }; const symbol = debug_info.symbol_table.search(address) orelse &unknown; - try out_stream.print(WHITE ++ "{}" ++ RESET ++ ": " ++ DIM ++ ptr_hex ++ " in ??? (???)" ++ RESET ++ "\n", symbol.name, address); + try out_stream.print(WHITE ++ "{}" ++ RESET ++ ": " ++ DIM ++ "0x{x}" ++ " in ??? (???)" ++ RESET ++ "\n", symbol.name, address); }, else => { const compile_unit = findCompileUnit(debug_info, address) catch { - try out_stream.print("???:?:?: " ++ DIM ++ ptr_hex ++ " in ??? (???)" ++ RESET ++ "\n ???\n\n", address); + if (tty_color) { + try out_stream.print("???:?:?: " ++ DIM ++ "0x{x} in ??? (???)" ++ RESET ++ "\n ???\n\n", address); + } else { + try out_stream.print("???:?:?: 0x{x} in ??? (???)\n ???\n\n", address); + } return; }; const compile_unit_name = try compile_unit.die.getAttrString(debug_info, DW.AT_name); if (getLineNumberInfo(debug_info, compile_unit, address - 1)) |line_info| { defer line_info.deinit(); - try out_stream.print(WHITE ++ "{}:{}:{}" ++ RESET ++ ": " ++ DIM ++ ptr_hex ++ " in ??? ({})" ++ RESET ++ "\n", line_info.file_name, line_info.line, line_info.column, address, compile_unit_name); - if (printLineFromFile(debug_info.allocator(), out_stream, line_info)) { - if (line_info.column == 0) { - try out_stream.write("\n"); - } else { - { - var col_i: usize = 1; - while (col_i < line_info.column) : (col_i += 1) { - try out_stream.writeByte(' '); + if (tty_color) { + try out_stream.print( + WHITE ++ "{}:{}:{}" ++ RESET ++ ": " ++ DIM ++ "0x{x} in ??? ({})" ++ RESET ++ "\n", + line_info.file_name, + line_info.line, + line_info.column, + address, + compile_unit_name, + ); + if (printLineFromFile(debug_info.allocator(), out_stream, line_info)) { + if (line_info.column == 0) { + try out_stream.write("\n"); + } else { + { + var col_i: usize = 1; + while (col_i < line_info.column) : (col_i += 1) { + try out_stream.writeByte(' '); + } } + try out_stream.write(GREEN ++ "^" ++ RESET ++ "\n"); } - try out_stream.write(GREEN ++ "^" ++ RESET ++ "\n"); + } else |err| switch (err) { + error.EndOfFile => {}, + else => return err, } - } else |err| switch (err) { - error.EndOfFile => {}, - else => return err, + } else { + try out_stream.print( + "{}:{}:{}: 0x{x} in ??? ({})\n", + line_info.file_name, + line_info.line, + line_info.column, + address, + compile_unit_name, + ); } } else |err| switch (err) { error.MissingDebugInfo, error.InvalidDebugInfo => { - try out_stream.print(ptr_hex ++ " in ??? ({})\n", address, compile_unit_name); + try out_stream.print("0x{x} in ??? ({})\n", address, compile_unit_name); }, else => return err, } @@ -1098,7 +1130,7 @@ fn readILeb128(in_stream: var) !i64 { /// This should only be used in temporary test programs. pub const global_allocator = &global_fixed_allocator.allocator; -var global_fixed_allocator = std.heap.FixedBufferAllocator.init(global_allocator_mem[0..]); +var global_fixed_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(global_allocator_mem[0..]); var global_allocator_mem: [100 * 1024]u8 = undefined; // TODO make thread safe diff --git a/std/event.zig b/std/event.zig index c6ac04a9d0..7e9928b3d7 100644 --- a/std/event.zig +++ b/std/event.zig @@ -1,525 +1,13 @@ -const std = @import("index.zig"); -const builtin = @import("builtin"); -const assert = std.debug.assert; -const event = this; -const mem = std.mem; -const posix = std.os.posix; -const AtomicRmwOp = builtin.AtomicRmwOp; -const AtomicOrder = builtin.AtomicOrder; - -pub const TcpServer = struct { - handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void, - - loop: *Loop, - sockfd: i32, - accept_coro: ?promise, - listen_address: std.net.Address, - - waiting_for_emfile_node: PromiseNode, - - const PromiseNode = std.LinkedList(promise).Node; - - pub fn init(loop: *Loop) !TcpServer { - const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); - errdefer std.os.close(sockfd); - - // TODO can't initialize handler coroutine here because we need well defined copy elision - return TcpServer{ - .loop = loop, - .sockfd = sockfd, - .accept_coro = null, - .handleRequestFn = undefined, - .waiting_for_emfile_node = undefined, - .listen_address = undefined, - }; - } - - pub fn listen(self: *TcpServer, address: *const std.net.Address, handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void) !void { - self.handleRequestFn = handleRequestFn; - - try std.os.posixBind(self.sockfd, &address.os_addr); - try std.os.posixListen(self.sockfd, posix.SOMAXCONN); - self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(self.sockfd)); - - self.accept_coro = try async<self.loop.allocator> TcpServer.handler(self); - errdefer cancel self.accept_coro.?; - - try self.loop.addFd(self.sockfd, self.accept_coro.?); - errdefer self.loop.removeFd(self.sockfd); - } - - pub fn deinit(self: *TcpServer) void { - self.loop.removeFd(self.sockfd); - if (self.accept_coro) |accept_coro| cancel accept_coro; - std.os.close(self.sockfd); - } - - pub async fn handler(self: *TcpServer) void { - while (true) { - var accepted_addr: std.net.Address = undefined; - if (std.os.posixAccept(self.sockfd, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { - var socket = std.os.File.openHandle(accepted_fd); - _ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) { - error.OutOfMemory => { - socket.close(); - continue; - }, - }; - } else |err| switch (err) { - error.WouldBlock => { - suspend; // we will get resumed by epoll_wait in the event loop - continue; - }, - error.ProcessFdQuotaExceeded => { - errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node); - suspend |p| { - self.waiting_for_emfile_node = PromiseNode.init(p); - std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node); - } - continue; - }, - error.ConnectionAborted, error.FileDescriptorClosed => continue, - - error.PageFault => unreachable, - error.InvalidSyscall => unreachable, - error.FileDescriptorNotASocket => unreachable, - error.OperationNotSupported => unreachable, - - error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => { - @panic("TODO handle this error"); - }, - } - } - } -}; - -pub const Loop = struct { - allocator: *mem.Allocator, - keep_running: bool, - next_tick_queue: std.atomic.QueueMpsc(promise), - os_data: OsData, - - const OsData = switch (builtin.os) { - builtin.Os.linux => struct { - epollfd: i32, - }, - else => struct {}, - }; - - pub const NextTickNode = std.atomic.QueueMpsc(promise).Node; - - /// The allocator must be thread-safe because we use it for multiplexing - /// coroutines onto kernel threads. - pub fn init(allocator: *mem.Allocator) !Loop { - var self = Loop{ - .keep_running = true, - .allocator = allocator, - .os_data = undefined, - .next_tick_queue = std.atomic.QueueMpsc(promise).init(), - }; - try self.initOsData(); - errdefer self.deinitOsData(); - - return self; - } - - /// must call stop before deinit - pub fn deinit(self: *Loop) void { - self.deinitOsData(); - } - - const InitOsDataError = std.os.LinuxEpollCreateError; - - fn initOsData(self: *Loop) InitOsDataError!void { - switch (builtin.os) { - builtin.Os.linux => { - self.os_data.epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC); - errdefer std.os.close(self.os_data.epollfd); - }, - else => {}, - } - } - - fn deinitOsData(self: *Loop) void { - switch (builtin.os) { - builtin.Os.linux => std.os.close(self.os_data.epollfd), - else => {}, - } - } - - pub fn addFd(self: *Loop, fd: i32, prom: promise) !void { - var ev = std.os.linux.epoll_event{ - .events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET, - .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(prom) }, - }; - try std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev); - } - - pub fn removeFd(self: *Loop, fd: i32) void { - std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; - } - async fn waitFd(self: *Loop, fd: i32) !void { - defer self.removeFd(fd); - suspend |p| { - try self.addFd(fd, p); - } - } - - pub fn stop(self: *Loop) void { - // TODO make atomic - self.keep_running = false; - // TODO activate an fd in the epoll set which should cancel all the promises - } - - /// bring your own linked list node. this means it can't fail. - pub fn onNextTick(self: *Loop, node: *NextTickNode) void { - self.next_tick_queue.put(node); - } - - pub fn run(self: *Loop) void { - while (self.keep_running) { - // TODO multiplex the next tick queue and the epoll event results onto a thread pool - while (self.next_tick_queue.get()) |node| { - resume node.data; - } - if (!self.keep_running) break; - - self.dispatchOsEvents(); - } - } - - fn dispatchOsEvents(self: *Loop) void { - switch (builtin.os) { - builtin.Os.linux => { - var events: [16]std.os.linux.epoll_event = undefined; - const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); - for (events[0..count]) |ev| { - const p = @intToPtr(promise, ev.data.ptr); - resume p; - } - }, - else => {}, - } - } -}; - -/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size -/// when buffer is empty, consumers suspend and are resumed by producers -/// when buffer is full, producers suspend and are resumed by consumers -pub fn Channel(comptime T: type) type { - return struct { - loop: *Loop, - - getters: std.atomic.QueueMpsc(GetNode), - putters: std.atomic.QueueMpsc(PutNode), - get_count: usize, - put_count: usize, - dispatch_lock: u8, // TODO make this a bool - need_dispatch: u8, // TODO make this a bool - - // simple fixed size ring buffer - buffer_nodes: []T, - buffer_index: usize, - buffer_len: usize, - - const SelfChannel = this; - const GetNode = struct { - ptr: *T, - tick_node: *Loop.NextTickNode, - }; - const PutNode = struct { - data: T, - tick_node: *Loop.NextTickNode, - }; - - /// call destroy when done - pub fn create(loop: *Loop, capacity: usize) !*SelfChannel { - const buffer_nodes = try loop.allocator.alloc(T, capacity); - errdefer loop.allocator.free(buffer_nodes); - - const self = try loop.allocator.create(SelfChannel{ - .loop = loop, - .buffer_len = 0, - .buffer_nodes = buffer_nodes, - .buffer_index = 0, - .dispatch_lock = 0, - .need_dispatch = 0, - .getters = std.atomic.QueueMpsc(GetNode).init(), - .putters = std.atomic.QueueMpsc(PutNode).init(), - .get_count = 0, - .put_count = 0, - }); - errdefer loop.allocator.destroy(self); - - return self; - } - - /// must be called when all calls to put and get have suspended and no more calls occur - pub fn destroy(self: *SelfChannel) void { - while (self.getters.get()) |get_node| { - cancel get_node.data.tick_node.data; - } - while (self.putters.get()) |put_node| { - cancel put_node.data.tick_node.data; - } - self.loop.allocator.free(self.buffer_nodes); - self.loop.allocator.destroy(self); - } - - /// puts a data item in the channel. The promise completes when the value has been added to the - /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter. - pub async fn put(self: *SelfChannel, data: T) void { - // TODO should be able to group memory allocation failure before first suspend point - // so that the async invocation catches it - var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined; - _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable; - - suspend |handle| { - var my_tick_node = Loop.NextTickNode{ - .next = undefined, - .data = handle, - }; - var queue_node = std.atomic.QueueMpsc(PutNode).Node{ - .data = PutNode{ - .tick_node = &my_tick_node, - .data = data, - }, - .next = undefined, - }; - self.putters.put(&queue_node); - _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); - - self.loop.onNextTick(dispatch_tick_node_ptr); - } - } - - /// await this function to get an item from the channel. If the buffer is empty, the promise will - /// complete when the next item is put in the channel. - pub async fn get(self: *SelfChannel) T { - // TODO should be able to group memory allocation failure before first suspend point - // so that the async invocation catches it - var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined; - _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable; - - // TODO integrate this function with named return values - // so we can get rid of this extra result copy - var result: T = undefined; - var debug_handle: usize = undefined; - suspend |handle| { - debug_handle = @ptrToInt(handle); - var my_tick_node = Loop.NextTickNode{ - .next = undefined, - .data = handle, - }; - var queue_node = std.atomic.QueueMpsc(GetNode).Node{ - .data = GetNode{ - .ptr = &result, - .tick_node = &my_tick_node, - }, - .next = undefined, - }; - self.getters.put(&queue_node); - _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); - - self.loop.onNextTick(dispatch_tick_node_ptr); - } - return result; - } - - async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void { - // resumed by onNextTick - suspend |handle| { - var tick_node = Loop.NextTickNode{ - .data = handle, - .next = undefined, - }; - tick_node_ptr.* = &tick_node; - } - - // set the "need dispatch" flag - _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); - - lock: while (true) { - // set the lock flag - const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); - if (prev_lock != 0) return; - - // clear the need_dispatch flag since we're about to do it - _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); - - while (true) { - one_dispatch: { - // later we correct these extra subtractions - var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - - // transfer self.buffer to self.getters - while (self.buffer_len != 0) { - if (get_count == 0) break :one_dispatch; - - const get_node = &self.getters.get().?.data; - get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len]; - self.loop.onNextTick(get_node.tick_node); - self.buffer_len -= 1; - - get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - } - - // direct transfer self.putters to self.getters - while (get_count != 0 and put_count != 0) { - const get_node = &self.getters.get().?.data; - const put_node = &self.putters.get().?.data; - - get_node.ptr.* = put_node.data; - self.loop.onNextTick(get_node.tick_node); - self.loop.onNextTick(put_node.tick_node); - - get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - } - - // transfer self.putters to self.buffer - while (self.buffer_len != self.buffer_nodes.len and put_count != 0) { - const put_node = &self.putters.get().?.data; - - self.buffer_nodes[self.buffer_index] = put_node.data; - self.loop.onNextTick(put_node.tick_node); - self.buffer_index +%= 1; - self.buffer_len += 1; - - put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - } - } - - // undo the extra subtractions - _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); - _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); - - // clear need-dispatch flag - const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); - if (need_dispatch != 0) continue; - - const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); - assert(my_lock != 0); - - // we have to check again now that we unlocked - if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock; - - return; - } - } - } - }; -} - -pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File { - var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733 - - const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); - errdefer std.os.close(sockfd); - - try std.os.posixConnectAsync(sockfd, &address.os_addr); - try await try async loop.waitFd(sockfd); - try std.os.posixGetSockOptConnectError(sockfd); - - return std.os.File.openHandle(sockfd); -} - -test "listen on a port, send bytes, receive bytes" { - if (builtin.os != builtin.Os.linux) { - // TODO build abstractions for other operating systems - return; - } - const MyServer = struct { - tcp_server: TcpServer, - - const Self = this; - async<*mem.Allocator> fn handler(tcp_server: *TcpServer, _addr: *const std.net.Address, _socket: *const std.os.File) void { - const self = @fieldParentPtr(Self, "tcp_server", tcp_server); - var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733 - defer socket.close(); - const next_handler = async errorableHandler(self, _addr, socket) catch |err| switch (err) { - error.OutOfMemory => @panic("unable to handle connection: out of memory"), - }; - (await next_handler) catch |err| { - std.debug.panic("unable to handle connection: {}\n", err); - }; - suspend |p| { - cancel p; - } - } - async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: *const std.os.File) !void { - const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/733 - var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733 - - var adapter = std.io.FileOutStream.init(&socket); - var stream = &adapter.stream; - try stream.print("hello from server\n"); - } - }; - - const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable; - const addr = std.net.Address.initIp4(ip4addr, 0); - - var loop = try Loop.init(std.debug.global_allocator); - var server = MyServer{ .tcp_server = try TcpServer.init(&loop) }; - defer server.tcp_server.deinit(); - try server.tcp_server.listen(addr, MyServer.handler); - - const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address); - defer cancel p; - loop.run(); -} - -async fn doAsyncTest(loop: *Loop, address: *const std.net.Address) void { - errdefer @panic("test failure"); - - var socket_file = try await try async event.connect(loop, address); - defer socket_file.close(); - - var buf: [512]u8 = undefined; - const amt_read = try socket_file.read(buf[0..]); - const msg = buf[0..amt_read]; - assert(mem.eql(u8, msg, "hello from server\n")); - loop.stop(); -} - -test "std.event.Channel" { - var da = std.heap.DirectAllocator.init(); - defer da.deinit(); - - const allocator = &da.allocator; - - var loop = try Loop.init(allocator); - defer loop.deinit(); - - const channel = try Channel(i32).create(&loop, 0); - defer channel.destroy(); - - const handle = try async<allocator> testChannelGetter(&loop, channel); - defer cancel handle; - - const putter = try async<allocator> testChannelPutter(channel); - defer cancel putter; - - loop.run(); -} - -async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void { - errdefer @panic("test failed"); - - const value1_promise = try async channel.get(); - const value1 = await value1_promise; - assert(value1 == 1234); - - const value2_promise = try async channel.get(); - const value2 = await value2_promise; - assert(value2 == 4567); - - loop.stop(); -} - -async fn testChannelPutter(channel: *Channel(i32)) void { - await (async channel.put(1234) catch @panic("out of memory")); - await (async channel.put(4567) catch @panic("out of memory")); +pub const Locked = @import("event/locked.zig").Locked; +pub const Loop = @import("event/loop.zig").Loop; +pub const Lock = @import("event/lock.zig").Lock; +pub const tcp = @import("event/tcp.zig"); +pub const Channel = @import("event/channel.zig").Channel; + +test "import event tests" { + _ = @import("event/locked.zig"); + _ = @import("event/loop.zig"); + _ = @import("event/lock.zig"); + _ = @import("event/tcp.zig"); + _ = @import("event/channel.zig"); } diff --git a/std/event/channel.zig b/std/event/channel.zig new file mode 100644 index 0000000000..4b3a7177a2 --- /dev/null +++ b/std/event/channel.zig @@ -0,0 +1,254 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const Loop = std.event.Loop; + +/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size +/// when buffer is empty, consumers suspend and are resumed by producers +/// when buffer is full, producers suspend and are resumed by consumers +pub fn Channel(comptime T: type) type { + return struct { + loop: *Loop, + + getters: std.atomic.QueueMpsc(GetNode), + putters: std.atomic.QueueMpsc(PutNode), + get_count: usize, + put_count: usize, + dispatch_lock: u8, // TODO make this a bool + need_dispatch: u8, // TODO make this a bool + + // simple fixed size ring buffer + buffer_nodes: []T, + buffer_index: usize, + buffer_len: usize, + + const SelfChannel = this; + const GetNode = struct { + ptr: *T, + tick_node: *Loop.NextTickNode, + }; + const PutNode = struct { + data: T, + tick_node: *Loop.NextTickNode, + }; + + /// call destroy when done + pub fn create(loop: *Loop, capacity: usize) !*SelfChannel { + const buffer_nodes = try loop.allocator.alloc(T, capacity); + errdefer loop.allocator.free(buffer_nodes); + + const self = try loop.allocator.create(SelfChannel{ + .loop = loop, + .buffer_len = 0, + .buffer_nodes = buffer_nodes, + .buffer_index = 0, + .dispatch_lock = 0, + .need_dispatch = 0, + .getters = std.atomic.QueueMpsc(GetNode).init(), + .putters = std.atomic.QueueMpsc(PutNode).init(), + .get_count = 0, + .put_count = 0, + }); + errdefer loop.allocator.destroy(self); + + return self; + } + + /// must be called when all calls to put and get have suspended and no more calls occur + pub fn destroy(self: *SelfChannel) void { + while (self.getters.get()) |get_node| { + cancel get_node.data.tick_node.data; + } + while (self.putters.get()) |put_node| { + cancel put_node.data.tick_node.data; + } + self.loop.allocator.free(self.buffer_nodes); + self.loop.allocator.destroy(self); + } + + /// puts a data item in the channel. The promise completes when the value has been added to the + /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter. + pub async fn put(self: *SelfChannel, data: T) void { + // TODO should be able to group memory allocation failure before first suspend point + // so that the async invocation catches it + var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined; + _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable; + + suspend |handle| { + var my_tick_node = Loop.NextTickNode{ + .next = undefined, + .data = handle, + }; + var queue_node = std.atomic.QueueMpsc(PutNode).Node{ + .data = PutNode{ + .tick_node = &my_tick_node, + .data = data, + }, + .next = undefined, + }; + self.putters.put(&queue_node); + _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + + self.loop.onNextTick(dispatch_tick_node_ptr); + } + } + + /// await this function to get an item from the channel. If the buffer is empty, the promise will + /// complete when the next item is put in the channel. + pub async fn get(self: *SelfChannel) T { + // TODO should be able to group memory allocation failure before first suspend point + // so that the async invocation catches it + var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined; + _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable; + + // TODO integrate this function with named return values + // so we can get rid of this extra result copy + var result: T = undefined; + suspend |handle| { + var my_tick_node = Loop.NextTickNode{ + .next = undefined, + .data = handle, + }; + var queue_node = std.atomic.QueueMpsc(GetNode).Node{ + .data = GetNode{ + .ptr = &result, + .tick_node = &my_tick_node, + }, + .next = undefined, + }; + self.getters.put(&queue_node); + _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + + self.loop.onNextTick(dispatch_tick_node_ptr); + } + return result; + } + + async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void { + // resumed by onNextTick + suspend |handle| { + var tick_node = Loop.NextTickNode{ + .data = handle, + .next = undefined, + }; + tick_node_ptr.* = &tick_node; + } + + // set the "need dispatch" flag + _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + + lock: while (true) { + // set the lock flag + const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + if (prev_lock != 0) return; + + // clear the need_dispatch flag since we're about to do it + _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + while (true) { + one_dispatch: { + // later we correct these extra subtractions + var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + + // transfer self.buffer to self.getters + while (self.buffer_len != 0) { + if (get_count == 0) break :one_dispatch; + + const get_node = &self.getters.get().?.data; + get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len]; + self.loop.onNextTick(get_node.tick_node); + self.buffer_len -= 1; + + get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + + // direct transfer self.putters to self.getters + while (get_count != 0 and put_count != 0) { + const get_node = &self.getters.get().?.data; + const put_node = &self.putters.get().?.data; + + get_node.ptr.* = put_node.data; + self.loop.onNextTick(get_node.tick_node); + self.loop.onNextTick(put_node.tick_node); + + get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + + // transfer self.putters to self.buffer + while (self.buffer_len != self.buffer_nodes.len and put_count != 0) { + const put_node = &self.putters.get().?.data; + + self.buffer_nodes[self.buffer_index] = put_node.data; + self.loop.onNextTick(put_node.tick_node); + self.buffer_index +%= 1; + self.buffer_len += 1; + + put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + } + + // undo the extra subtractions + _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + + // clear need-dispatch flag + const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + if (need_dispatch != 0) continue; + + const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + assert(my_lock != 0); + + // we have to check again now that we unlocked + if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock; + + return; + } + } + } + }; +} + +test "std.event.Channel" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + // TODO make a multi threaded test + try loop.initSingleThreaded(allocator); + defer loop.deinit(); + + const channel = try Channel(i32).create(&loop, 0); + defer channel.destroy(); + + const handle = try async<allocator> testChannelGetter(&loop, channel); + defer cancel handle; + + const putter = try async<allocator> testChannelPutter(channel); + defer cancel putter; + + loop.run(); +} + +async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void { + errdefer @panic("test failed"); + + const value1_promise = try async channel.get(); + const value1 = await value1_promise; + assert(value1 == 1234); + + const value2_promise = try async channel.get(); + const value2 = await value2_promise; + assert(value2 == 4567); +} + +async fn testChannelPutter(channel: *Channel(i32)) void { + await (async channel.put(1234) catch @panic("out of memory")); + await (async channel.put(4567) catch @panic("out of memory")); +} + diff --git a/std/event/lock.zig b/std/event/lock.zig new file mode 100644 index 0000000000..2a8d5ada77 --- /dev/null +++ b/std/event/lock.zig @@ -0,0 +1,204 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const mem = std.mem; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const Loop = std.event.Loop; + +/// Thread-safe async/await lock. +/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +pub const Lock = struct { + loop: *Loop, + shared_bit: u8, // TODO make this a bool + queue: Queue, + queue_empty_bit: u8, // TODO make this a bool + + const Queue = std.atomic.QueueMpsc(promise); + + pub const Held = struct { + lock: *Lock, + + pub fn release(self: Held) void { + // Resume the next item from the queue. + if (self.lock.queue.get()) |node| { + self.lock.loop.onNextTick(node); + return; + } + + // We need to release the lock. + _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + // There might be a queue item. If we know the queue is empty, we can be done, + // because the other actor will try to obtain the lock. + // But if there's a queue item, we are the actor which must loop and attempt + // to grab the lock again. + if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) { + return; + } + + while (true) { + const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + if (old_bit != 0) { + // We did not obtain the lock. Great, the queue is someone else's problem. + return; + } + + // Resume the next item from the queue. + if (self.lock.queue.get()) |node| { + self.lock.loop.onNextTick(node); + return; + } + + // Release the lock again. + _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + // Find out if we can be done. + if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) { + return; + } + } + } + }; + + pub fn init(loop: *Loop) Lock { + return Lock{ + .loop = loop, + .shared_bit = 0, + .queue = Queue.init(), + .queue_empty_bit = 1, + }; + } + + /// Must be called when not locked. Not thread safe. + /// All calls to acquire() and release() must complete before calling deinit(). + pub fn deinit(self: *Lock) void { + assert(self.shared_bit == 0); + while (self.queue.get()) |node| cancel node.data; + } + + pub async fn acquire(self: *Lock) Held { + s: suspend |handle| { + // TODO explicitly put this memory in the coroutine frame #1194 + var my_tick_node = Loop.NextTickNode{ + .data = handle, + .next = undefined, + }; + + self.queue.put(&my_tick_node); + + // At this point, we are in the queue, so we might have already been resumed and this coroutine + // frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame. + + // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor + // will attempt to grab the lock. + _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + while (true) { + const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + if (old_bit != 0) { + // We did not obtain the lock. Trust that our queue entry will resume us, and allow + // suspend to complete. + break; + } + // We got the lock. However we might have already been resumed from the queue. + if (self.queue.get()) |node| { + // Whether this node is us or someone else, we tail resume it. + resume node.data; + break; + } else { + // We already got resumed, and there are none left in the queue, which means that + // we aren't even supposed to hold the lock right now. + _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + // There might be a queue item. If we know the queue is empty, we can be done, + // because the other actor will try to obtain the lock. + // But if there's a queue item, we are the actor which must loop and attempt + // to grab the lock again. + if (@atomicLoad(u8, &self.queue_empty_bit, AtomicOrder.SeqCst) == 1) { + break; + } else { + continue; + } + } + unreachable; + } + } + + return Held{ .lock = self }; + } +}; + +test "std.event.Lock" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var lock = Lock.init(&loop); + defer lock.deinit(); + + const handle = try async<allocator> testLock(&loop, &lock); + defer cancel handle; + loop.run(); + + assert(mem.eql(i32, shared_test_data, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len)); +} + +async fn testLock(loop: *Loop, lock: *Lock) void { + // TODO explicitly put next tick node memory in the coroutine frame #1194 + suspend |p| { + resume p; + } + const handle1 = async lockRunner(lock) catch @panic("out of memory"); + var tick_node1 = Loop.NextTickNode{ + .next = undefined, + .data = handle1, + }; + loop.onNextTick(&tick_node1); + + const handle2 = async lockRunner(lock) catch @panic("out of memory"); + var tick_node2 = Loop.NextTickNode{ + .next = undefined, + .data = handle2, + }; + loop.onNextTick(&tick_node2); + + const handle3 = async lockRunner(lock) catch @panic("out of memory"); + var tick_node3 = Loop.NextTickNode{ + .next = undefined, + .data = handle3, + }; + loop.onNextTick(&tick_node3); + + await handle1; + await handle2; + await handle3; +} + +var shared_test_data = [1]i32{0} ** 10; +var shared_test_index: usize = 0; + +async fn lockRunner(lock: *Lock) void { + suspend; // resumed by onNextTick + + var i: usize = 0; + while (i < shared_test_data.len) : (i += 1) { + const lock_promise = async lock.acquire() catch @panic("out of memory"); + const handle = await lock_promise; + defer handle.release(); + + shared_test_index = 0; + while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) { + shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1; + } + } +} diff --git a/std/event/locked.zig b/std/event/locked.zig new file mode 100644 index 0000000000..e7ad544d78 --- /dev/null +++ b/std/event/locked.zig @@ -0,0 +1,43 @@ +const std = @import("../index.zig"); +const Lock = std.event.Lock; +const Loop = std.event.Loop; + +/// Thread-safe async/await lock that protects one piece of data. +/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +pub fn Locked(comptime T: type) type { + return struct { + lock: Lock, + private_data: T, + + const Self = this; + + pub const HeldLock = struct { + value: *T, + held: Lock.Held, + + pub fn release(self: HeldLock) void { + self.held.release(); + } + }; + + pub fn init(loop: *Loop, data: T) Self { + return Self{ + .lock = Lock.init(loop), + .private_data = data, + }; + } + + pub fn deinit(self: *Self) void { + self.lock.deinit(); + } + + pub async fn acquire(self: *Self) HeldLock { + return HeldLock{ + // TODO guaranteed allocation elision + .held = await (async self.lock.acquire() catch unreachable), + .value = &self.private_data, + }; + } + }; +} diff --git a/std/event/loop.zig b/std/event/loop.zig new file mode 100644 index 0000000000..646f15875f --- /dev/null +++ b/std/event/loop.zig @@ -0,0 +1,629 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const mem = std.mem; +const posix = std.os.posix; +const windows = std.os.windows; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; + +pub const Loop = struct { + allocator: *mem.Allocator, + next_tick_queue: std.atomic.QueueMpsc(promise), + os_data: OsData, + final_resume_node: ResumeNode, + dispatch_lock: u8, // TODO make this a bool + pending_event_count: usize, + extra_threads: []*std.os.Thread, + + // pre-allocated eventfds. all permanently active. + // this is how we send promises to be resumed on other threads. + available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), + eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, + + pub const NextTickNode = std.atomic.QueueMpsc(promise).Node; + + pub const ResumeNode = struct { + id: Id, + handle: promise, + + pub const Id = enum { + Basic, + Stop, + EventFd, + }; + + pub const EventFd = switch (builtin.os) { + builtin.Os.macosx => MacOsEventFd, + builtin.Os.linux => struct { + base: ResumeNode, + epoll_op: u32, + eventfd: i32, + }, + builtin.Os.windows => struct { + base: ResumeNode, + completion_key: usize, + }, + else => @compileError("unsupported OS"), + }; + + const MacOsEventFd = struct { + base: ResumeNode, + kevent: posix.Kevent, + }; + }; + + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { + return self.initInternal(allocator, 1); + } + + /// The allocator must be thread-safe because we use it for multiplexing + /// coroutines onto kernel threads. + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { + const core_count = try std.os.cpuCount(allocator); + return self.initInternal(allocator, core_count); + } + + /// Thread count is the total thread count. The thread pool size will be + /// max(thread_count - 1, 0) + fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { + self.* = Loop{ + .pending_event_count = 0, + .allocator = allocator, + .os_data = undefined, + .next_tick_queue = std.atomic.QueueMpsc(promise).init(), + .dispatch_lock = 1, // start locked so threads go directly into epoll wait + .extra_threads = undefined, + .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), + .eventfd_resume_nodes = undefined, + .final_resume_node = ResumeNode{ + .id = ResumeNode.Id.Stop, + .handle = undefined, + }, + }; + const extra_thread_count = thread_count - 1; + self.eventfd_resume_nodes = try self.allocator.alloc( + std.atomic.Stack(ResumeNode.EventFd).Node, + extra_thread_count, + ); + errdefer self.allocator.free(self.eventfd_resume_nodes); + + self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count); + errdefer self.allocator.free(self.extra_threads); + + try self.initOsData(extra_thread_count); + errdefer self.deinitOsData(); + } + + /// must call stop before deinit + pub fn deinit(self: *Loop) void { + self.deinitOsData(); + self.allocator.free(self.extra_threads); + } + + const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError || + std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError || + std.os.WindowsCreateIoCompletionPortError; + + const wakeup_bytes = []u8{0x1} ** 8; + + fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { + switch (builtin.os) { + builtin.Os.linux => { + errdefer { + while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd); + } + for (self.eventfd_resume_nodes) |*eventfd_node| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + .eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), + .epoll_op = posix.EPOLL_CTL_ADD, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC); + errdefer std.os.close(self.os_data.epollfd); + + self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); + errdefer std.os.close(self.os_data.final_eventfd); + + self.os_data.final_eventfd_event = posix.epoll_event{ + .events = posix.EPOLLIN, + .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, + }; + try std.os.linuxEpollCtl( + self.os_data.epollfd, + posix.EPOLL_CTL_ADD, + self.os_data.final_eventfd, + &self.os_data.final_eventfd_event, + ); + + var extra_thread_index: usize = 0; + errdefer { + // writing 8 bytes to an eventfd cannot fail + std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + } + }, + builtin.Os.macosx => { + self.os_data.kqfd = try std.os.bsdKQueue(); + errdefer std.os.close(self.os_data.kqfd); + + self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count); + errdefer self.allocator.free(self.os_data.kevents); + + const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + // this one is for sending events + .kevent = posix.Kevent{ + .ident = i, + .filter = posix.EVFILT_USER, + .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent); + _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); + eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; + eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; + // this one is for waiting for events + self.os_data.kevents[i] = posix.Kevent{ + .ident = i, + .filter = posix.EVFILT_USER, + .flags = 0, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }; + } + + // Pre-add so that we cannot get error.SystemResources + // later when we try to activate it. + self.os_data.final_kevent = posix.Kevent{ + .ident = extra_thread_count, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD | posix.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&self.final_resume_node), + }; + const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent); + _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); + self.os_data.final_kevent.flags = posix.EV_ENABLE; + self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; + + var extra_thread_index: usize = 0; + errdefer { + _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + } + }, + builtin.Os.windows => { + self.os_data.extra_thread_count = extra_thread_count; + + self.os_data.io_port = try std.os.windowsCreateIoCompletionPort( + windows.INVALID_HANDLE_VALUE, + null, + undefined, + undefined, + ); + errdefer std.os.close(self.os_data.io_port); + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + // this one is for sending events + .completion_key = @ptrToInt(&eventfd_node.data.base), + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + var extra_thread_index: usize = 0; + errdefer { + var i: usize = 0; + while (i < extra_thread_index) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; + } + } + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun); + } + }, + else => {}, + } + } + + fn deinitOsData(self: *Loop) void { + switch (builtin.os) { + builtin.Os.linux => { + std.os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd); + std.os.close(self.os_data.epollfd); + self.allocator.free(self.eventfd_resume_nodes); + }, + builtin.Os.macosx => { + self.allocator.free(self.os_data.kevents); + std.os.close(self.os_data.kqfd); + }, + builtin.Os.windows => { + std.os.close(self.os_data.io_port); + }, + else => {}, + } + } + + /// resume_node must live longer than the promise that it holds a reference to. + pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + errdefer { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + try self.modFd( + fd, + posix.EPOLL_CTL_ADD, + std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET, + resume_node, + ); + } + + pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void { + var ev = std.os.linux.epoll_event{ + .events = events, + .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, + }; + try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev); + } + + pub fn removeFd(self: *Loop, fd: i32) void { + self.removeFdNoCounter(fd); + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + + fn removeFdNoCounter(self: *Loop, fd: i32) void { + std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; + } + + pub async fn waitFd(self: *Loop, fd: i32) !void { + defer self.removeFd(fd); + suspend |p| { + // TODO explicitly put this memory in the coroutine frame #1194 + var resume_node = ResumeNode{ + .id = ResumeNode.Id.Basic, + .handle = p, + }; + try self.addFd(fd, &resume_node); + } + } + + /// Bring your own linked list node. This means it can't fail. + pub fn onNextTick(self: *Loop, node: *NextTickNode) void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + self.next_tick_queue.put(node); + } + + pub fn run(self: *Loop) void { + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.workerRun(); + for (self.extra_threads) |extra_thread| { + extra_thread.wait(); + } + } + + /// This is equivalent to an async call, except instead of beginning execution of the async function, + /// it immediately returns to the caller, and the async function is queued in the event loop. It still + /// returns a promise to be awaited. + pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) { + const S = struct { + async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType { + suspend |p| { + handle.* = p; + var my_tick_node = Loop.NextTickNode{ + .next = undefined, + .data = p, + }; + loop.onNextTick(&my_tick_node); + } + // TODO guaranteed allocation elision for await in same func as async + return await (async func(args2) catch unreachable); + } + }; + var handle: promise->@typeOf(func).ReturnType = undefined; + return async<self.allocator> S.asyncFunc(self, &handle, args); + } + + fn workerRun(self: *Loop) void { + start_over: while (true) { + if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) { + while (self.next_tick_queue.get()) |next_tick_node| { + const handle = next_tick_node.data; + if (self.next_tick_queue.isEmpty()) { + // last node, just resume it + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + } + + // non-last node, stick it in the epoll/kqueue set so that + // other threads can get to it + if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { + const eventfd_node = &resume_stack_node.data; + eventfd_node.base.handle = handle; + switch (builtin.os) { + builtin.Os.macosx => { + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); + const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch { + // fine, we didn't need it anyway + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.available_eventfd_resume_nodes.push(resume_stack_node); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + }; + }, + builtin.Os.linux => { + // the pending count is already accounted for + const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET; + self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch { + // fine, we didn't need it anyway + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.available_eventfd_resume_nodes.push(resume_stack_node); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + }; + }, + builtin.Os.windows => { + // this value is never dereferenced but we need it to be non-null so that + // the consumer code can decide whether to read the completion key. + // it has to do this for normal I/O, so we match that behavior here. + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch { + // fine, we didn't need it anyway + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + self.available_eventfd_resume_nodes.push(resume_stack_node); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + }; + }, + else => @compileError("unsupported OS"), + } + } else { + // threads are too busy, can't add another eventfd to wake one up + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + resume handle; + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + continue :start_over; + } + } + + const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst); + if (pending_event_count == 0) { + // cause all the threads to stop + switch (builtin.os) { + builtin.Os.linux => { + // writing 8 bytes to an eventfd cannot fail + std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + return; + }, + builtin.Os.macosx => { + const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); + const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + // cannot fail because we already added it and this just enables it + _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable; + return; + }, + builtin.Os.windows => { + var i: usize = 0; + while (i < self.os_data.extra_thread_count) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; + } + } + return; + }, + else => @compileError("unsupported OS"), + } + } + + _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + } + + switch (builtin.os) { + builtin.Os.linux => { + // only process 1 event so we don't steal from other threads + var events: [1]std.os.linux.epoll_event = undefined; + const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); + for (events[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + event_fd_node.epoll_op = posix.EPOLL_CTL_MOD; + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + } + }, + builtin.Os.macosx => { + var eventlist: [1]posix.Kevent = undefined; + const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable; + for (eventlist[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.udata); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + } + }, + builtin.Os.windows => { + var completion_key: usize = undefined; + while (true) { + var nbytes: windows.DWORD = undefined; + var overlapped: ?*windows.OVERLAPPED = undefined; + switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { + std.os.WindowsWaitResult.Aborted => return, + std.os.WindowsWaitResult.Normal => {}, + } + if (overlapped != null) break; + } + const resume_node = @intToPtr(*ResumeNode, completion_key); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + } + }, + else => @compileError("unsupported OS"), + } + } + } + + const OsData = switch (builtin.os) { + builtin.Os.linux => struct { + epollfd: i32, + final_eventfd: i32, + final_eventfd_event: std.os.linux.epoll_event, + }, + builtin.Os.macosx => MacOsData, + builtin.Os.windows => struct { + io_port: windows.HANDLE, + extra_thread_count: usize, + }, + else => struct {}, + }; + + const MacOsData = struct { + kqfd: i32, + final_kevent: posix.Kevent, + kevents: []posix.Kevent, + }; +}; + +test "std.event.Loop - basic" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + loop.run(); +} + +test "std.event.Loop - call" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var did_it = false; + const handle = try loop.call(testEventLoop); + const handle2 = try loop.call(testEventLoop2, handle, &did_it); + defer cancel handle2; + + loop.run(); + + assert(did_it); +} + +async fn testEventLoop() i32 { + return 1234; +} + +async fn testEventLoop2(h: promise->i32, did_it: *bool) void { + const value = await h; + assert(value == 1234); + did_it.* = true; +} diff --git a/std/event/tcp.zig b/std/event/tcp.zig new file mode 100644 index 0000000000..5151ecf934 --- /dev/null +++ b/std/event/tcp.zig @@ -0,0 +1,183 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const event = std.event; +const mem = std.mem; +const posix = std.os.posix; +const windows = std.os.windows; +const Loop = std.event.Loop; + +pub const Server = struct { + handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void, + + loop: *Loop, + sockfd: ?i32, + accept_coro: ?promise, + listen_address: std.net.Address, + + waiting_for_emfile_node: PromiseNode, + listen_resume_node: event.Loop.ResumeNode, + + const PromiseNode = std.LinkedList(promise).Node; + + pub fn init(loop: *Loop) Server { + // TODO can't initialize handler coroutine here because we need well defined copy elision + return Server{ + .loop = loop, + .sockfd = null, + .accept_coro = null, + .handleRequestFn = undefined, + .waiting_for_emfile_node = undefined, + .listen_address = undefined, + .listen_resume_node = event.Loop.ResumeNode{ + .id = event.Loop.ResumeNode.Id.Basic, + .handle = undefined, + }, + }; + } + + pub fn listen( + self: *Server, + address: *const std.net.Address, + handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void, + ) !void { + self.handleRequestFn = handleRequestFn; + + const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); + errdefer std.os.close(sockfd); + self.sockfd = sockfd; + + try std.os.posixBind(sockfd, &address.os_addr); + try std.os.posixListen(sockfd, posix.SOMAXCONN); + self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(sockfd)); + + self.accept_coro = try async<self.loop.allocator> Server.handler(self); + errdefer cancel self.accept_coro.?; + + self.listen_resume_node.handle = self.accept_coro.?; + try self.loop.addFd(sockfd, &self.listen_resume_node); + errdefer self.loop.removeFd(sockfd); + } + + /// Stop listening + pub fn close(self: *Server) void { + self.loop.removeFd(self.sockfd.?); + std.os.close(self.sockfd.?); + } + + pub fn deinit(self: *Server) void { + if (self.accept_coro) |accept_coro| cancel accept_coro; + if (self.sockfd) |sockfd| std.os.close(sockfd); + } + + pub async fn handler(self: *Server) void { + while (true) { + var accepted_addr: std.net.Address = undefined; + if (std.os.posixAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { + var socket = std.os.File.openHandle(accepted_fd); + _ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) { + error.OutOfMemory => { + socket.close(); + continue; + }, + }; + } else |err| switch (err) { + error.WouldBlock => { + suspend; // we will get resumed by epoll_wait in the event loop + continue; + }, + error.ProcessFdQuotaExceeded => { + errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node); + suspend |p| { + self.waiting_for_emfile_node = PromiseNode.init(p); + std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node); + } + continue; + }, + error.ConnectionAborted, error.FileDescriptorClosed => continue, + + error.PageFault => unreachable, + error.InvalidSyscall => unreachable, + error.FileDescriptorNotASocket => unreachable, + error.OperationNotSupported => unreachable, + + error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => { + @panic("TODO handle this error"); + }, + } + } + } +}; + +pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File { + var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733 + + const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); + errdefer std.os.close(sockfd); + + try std.os.posixConnectAsync(sockfd, &address.os_addr); + try await try async loop.waitFd(sockfd); + try std.os.posixGetSockOptConnectError(sockfd); + + return std.os.File.openHandle(sockfd); +} + +test "listen on a port, send bytes, receive bytes" { + if (builtin.os != builtin.Os.linux) { + // TODO build abstractions for other operating systems + return; + } + const MyServer = struct { + tcp_server: Server, + + const Self = this; + async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: *const std.os.File) void { + const self = @fieldParentPtr(Self, "tcp_server", tcp_server); + var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733 + defer socket.close(); + // TODO guarantee elision of this allocation + const next_handler = async errorableHandler(self, _addr, socket) catch unreachable; + (await next_handler) catch |err| { + std.debug.panic("unable to handle connection: {}\n", err); + }; + suspend |p| { + cancel p; + } + } + async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: *const std.os.File) !void { + const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/733 + var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733 + + var adapter = std.io.FileOutStream.init(&socket); + var stream = &adapter.stream; + try stream.print("hello from server\n"); + } + }; + + const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable; + const addr = std.net.Address.initIp4(ip4addr, 0); + + var loop: Loop = undefined; + try loop.initSingleThreaded(std.debug.global_allocator); + var server = MyServer{ .tcp_server = Server.init(&loop) }; + defer server.tcp_server.deinit(); + try server.tcp_server.listen(addr, MyServer.handler); + + const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address, &server.tcp_server); + defer cancel p; + loop.run(); +} + +async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void { + errdefer @panic("test failure"); + + var socket_file = try await try async connect(loop, address); + defer socket_file.close(); + + var buf: [512]u8 = undefined; + const amt_read = try socket_file.read(buf[0..]); + const msg = buf[0..amt_read]; + assert(mem.eql(u8, msg, "hello from server\n")); + server.close(); +} + diff --git a/std/hash_map.zig b/std/hash_map.zig index 3bd03d4f28..cebd5272c0 100644 --- a/std/hash_map.zig +++ b/std/hash_map.zig @@ -259,14 +259,14 @@ test "basic hash map usage" { var map = HashMap(i32, i32, hash_i32, eql_i32).init(&direct_allocator.allocator); defer map.deinit(); - assert((map.put(1, 11) catch unreachable) == null); - assert((map.put(2, 22) catch unreachable) == null); - assert((map.put(3, 33) catch unreachable) == null); - assert((map.put(4, 44) catch unreachable) == null); - assert((map.put(5, 55) catch unreachable) == null); + assert((try map.put(1, 11)) == null); + assert((try map.put(2, 22)) == null); + assert((try map.put(3, 33)) == null); + assert((try map.put(4, 44)) == null); + assert((try map.put(5, 55)) == null); - assert((map.put(5, 66) catch unreachable).? == 55); - assert((map.put(5, 55) catch unreachable).? == 66); + assert((try map.put(5, 66)).? == 55); + assert((try map.put(5, 55)).? == 66); assert(map.contains(2)); assert(map.get(2).?.value == 22); @@ -282,9 +282,9 @@ test "iterator hash map" { var reset_map = HashMap(i32, i32, hash_i32, eql_i32).init(&direct_allocator.allocator); defer reset_map.deinit(); - assert((reset_map.put(1, 11) catch unreachable) == null); - assert((reset_map.put(2, 22) catch unreachable) == null); - assert((reset_map.put(3, 33) catch unreachable) == null); + assert((try reset_map.put(1, 11)) == null); + assert((try reset_map.put(2, 22)) == null); + assert((try reset_map.put(3, 33)) == null); var keys = []i32{ 1, diff --git a/std/heap.zig b/std/heap.zig index 2e02733da1..ef22c8d0c5 100644 --- a/std/heap.zig +++ b/std/heap.zig @@ -38,7 +38,7 @@ fn cFree(self: *Allocator, old_mem: []u8) void { } /// This allocator makes a syscall directly for every allocation and free. -/// TODO make this thread-safe. The windows implementation will need some atomics. +/// Thread-safe and lock-free. pub const DirectAllocator = struct { allocator: Allocator, heap_handle: ?HeapHandle, @@ -74,34 +74,34 @@ pub const DirectAllocator = struct { const alloc_size = if (alignment <= os.page_size) n else n + alignment; const addr = p.mmap(null, alloc_size, p.PROT_READ | p.PROT_WRITE, p.MAP_PRIVATE | p.MAP_ANONYMOUS, -1, 0); if (addr == p.MAP_FAILED) return error.OutOfMemory; - if (alloc_size == n) return @intToPtr([*]u8, addr)[0..n]; - var aligned_addr = addr & ~usize(alignment - 1); - aligned_addr += alignment; + const aligned_addr = (addr & ~usize(alignment - 1)) + alignment; - //We can unmap the unused portions of our mmap, but we must only - // pass munmap bytes that exist outside our allocated pages or it - // will happily eat us too + // We can unmap the unused portions of our mmap, but we must only + // pass munmap bytes that exist outside our allocated pages or it + // will happily eat us too. - //Since alignment > page_size, we are by definition on a page boundry + // Since alignment > page_size, we are by definition on a page boundary. const unused_start = addr; const unused_len = aligned_addr - 1 - unused_start; - var err = p.munmap(unused_start, unused_len); - debug.assert(p.getErrno(err) == 0); + const err = p.munmap(unused_start, unused_len); + assert(p.getErrno(err) == 0); - //It is impossible that there is an unoccupied page at the top of our - // mmap. + // It is impossible that there is an unoccupied page at the top of our + // mmap. return @intToPtr([*]u8, aligned_addr)[0..n]; }, Os.windows => { const amt = n + alignment + @sizeOf(usize); - const heap_handle = self.heap_handle orelse blk: { - const hh = os.windows.HeapCreate(os.windows.HEAP_NO_SERIALIZE, amt, 0) orelse return error.OutOfMemory; - self.heap_handle = hh; - break :blk hh; + const optional_heap_handle = @atomicLoad(?HeapHandle, &self.heap_handle, builtin.AtomicOrder.SeqCst); + const heap_handle = optional_heap_handle orelse blk: { + const hh = os.windows.HeapCreate(0, amt, 0) orelse return error.OutOfMemory; + const other_hh = @cmpxchgStrong(?HeapHandle, &self.heap_handle, null, hh, builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) orelse break :blk hh; + _ = os.windows.HeapDestroy(hh); + break :blk other_hh.?; // can't be null because of the cmpxchg }; const ptr = os.windows.HeapAlloc(heap_handle, 0, amt) orelse return error.OutOfMemory; const root_addr = @ptrToInt(ptr); @@ -361,6 +361,73 @@ pub const ThreadSafeFixedBufferAllocator = struct { fn free(allocator: *Allocator, bytes: []u8) void {} }; +pub fn stackFallback(comptime size: usize, fallback_allocator: *Allocator) StackFallbackAllocator(size) { + return StackFallbackAllocator(size){ + .buffer = undefined, + .fallback_allocator = fallback_allocator, + .fixed_buffer_allocator = undefined, + .allocator = Allocator{ + .allocFn = StackFallbackAllocator(size).alloc, + .reallocFn = StackFallbackAllocator(size).realloc, + .freeFn = StackFallbackAllocator(size).free, + }, + }; +} + +pub fn StackFallbackAllocator(comptime size: usize) type { + return struct { + const Self = this; + + buffer: [size]u8, + allocator: Allocator, + fallback_allocator: *Allocator, + fixed_buffer_allocator: FixedBufferAllocator, + + pub fn get(self: *Self) *Allocator { + self.fixed_buffer_allocator = FixedBufferAllocator.init(self.buffer[0..]); + return &self.allocator; + } + + fn alloc(allocator: *Allocator, n: usize, alignment: u29) ![]u8 { + const self = @fieldParentPtr(Self, "allocator", allocator); + return FixedBufferAllocator.alloc(&self.fixed_buffer_allocator.allocator, n, alignment) catch + self.fallback_allocator.allocFn(self.fallback_allocator, n, alignment); + } + + fn realloc(allocator: *Allocator, old_mem: []u8, new_size: usize, alignment: u29) ![]u8 { + const self = @fieldParentPtr(Self, "allocator", allocator); + const in_buffer = @ptrToInt(old_mem.ptr) >= @ptrToInt(&self.buffer) and + @ptrToInt(old_mem.ptr) < @ptrToInt(&self.buffer) + self.buffer.len; + if (in_buffer) { + return FixedBufferAllocator.realloc( + &self.fixed_buffer_allocator.allocator, + old_mem, + new_size, + alignment, + ) catch { + const result = try self.fallback_allocator.allocFn( + self.fallback_allocator, + new_size, + alignment, + ); + mem.copy(u8, result, old_mem); + return result; + }; + } + return self.fallback_allocator.reallocFn(self.fallback_allocator, old_mem, new_size, alignment); + } + + fn free(allocator: *Allocator, bytes: []u8) void { + const self = @fieldParentPtr(Self, "allocator", allocator); + const in_buffer = @ptrToInt(bytes.ptr) >= @ptrToInt(&self.buffer) and + @ptrToInt(bytes.ptr) < @ptrToInt(&self.buffer) + self.buffer.len; + if (!in_buffer) { + return self.fallback_allocator.freeFn(self.fallback_allocator, bytes); + } + } + }; +} + test "c_allocator" { if (builtin.link_libc) { var slice = c_allocator.alloc(u8, 50) catch return; diff --git a/std/mem.zig b/std/mem.zig index b52d3e9f68..555e1e249d 100644 --- a/std/mem.zig +++ b/std/mem.zig @@ -6,7 +6,7 @@ const builtin = @import("builtin"); const mem = this; pub const Allocator = struct { - const Error = error{OutOfMemory}; + pub const Error = error{OutOfMemory}; /// Allocate byte_count bytes and return them in a slice, with the /// slice's pointer aligned at least to alignment bytes. diff --git a/std/os/darwin.zig b/std/os/darwin.zig index 15e5608343..4134e382fc 100644 --- a/std/os/darwin.zig +++ b/std/os/darwin.zig @@ -264,6 +264,224 @@ pub const SIGUSR1 = 30; /// user defined signal 2 pub const SIGUSR2 = 31; +/// no flag value +pub const KEVENT_FLAG_NONE = 0x000; + +/// immediate timeout +pub const KEVENT_FLAG_IMMEDIATE = 0x001; + +/// output events only include change +pub const KEVENT_FLAG_ERROR_EVENTS = 0x002; + +/// add event to kq (implies enable) +pub const EV_ADD = 0x0001; + +/// delete event from kq +pub const EV_DELETE = 0x0002; + +/// enable event +pub const EV_ENABLE = 0x0004; + +/// disable event (not reported) +pub const EV_DISABLE = 0x0008; + +/// only report one occurrence +pub const EV_ONESHOT = 0x0010; + +/// clear event state after reporting +pub const EV_CLEAR = 0x0020; + +/// force immediate event output +/// ... with or without EV_ERROR +/// ... use KEVENT_FLAG_ERROR_EVENTS +/// on syscalls supporting flags +pub const EV_RECEIPT = 0x0040; + +/// disable event after reporting +pub const EV_DISPATCH = 0x0080; + +/// unique kevent per udata value +pub const EV_UDATA_SPECIFIC = 0x0100; + +/// ... in combination with EV_DELETE +/// will defer delete until udata-specific +/// event enabled. EINPROGRESS will be +/// returned to indicate the deferral +pub const EV_DISPATCH2 = EV_DISPATCH | EV_UDATA_SPECIFIC; + +/// report that source has vanished +/// ... only valid with EV_DISPATCH2 +pub const EV_VANISHED = 0x0200; + +/// reserved by system +pub const EV_SYSFLAGS = 0xF000; + +/// filter-specific flag +pub const EV_FLAG0 = 0x1000; + +/// filter-specific flag +pub const EV_FLAG1 = 0x2000; + +/// EOF detected +pub const EV_EOF = 0x8000; + +/// error, data contains errno +pub const EV_ERROR = 0x4000; + +pub const EV_POLL = EV_FLAG0; +pub const EV_OOBAND = EV_FLAG1; + +pub const EVFILT_READ = -1; +pub const EVFILT_WRITE = -2; + +/// attached to aio requests +pub const EVFILT_AIO = -3; + +/// attached to vnodes +pub const EVFILT_VNODE = -4; + +/// attached to struct proc +pub const EVFILT_PROC = -5; + +/// attached to struct proc +pub const EVFILT_SIGNAL = -6; + +/// timers +pub const EVFILT_TIMER = -7; + +/// Mach portsets +pub const EVFILT_MACHPORT = -8; + +/// Filesystem events +pub const EVFILT_FS = -9; + +/// User events +pub const EVFILT_USER = -10; + +/// Virtual memory events +pub const EVFILT_VM = -12; + +/// Exception events +pub const EVFILT_EXCEPT = -15; + +pub const EVFILT_SYSCOUNT = 17; + +/// On input, NOTE_TRIGGER causes the event to be triggered for output. +pub const NOTE_TRIGGER = 0x01000000; + +/// ignore input fflags +pub const NOTE_FFNOP = 0x00000000; + +/// and fflags +pub const NOTE_FFAND = 0x40000000; + +/// or fflags +pub const NOTE_FFOR = 0x80000000; + +/// copy fflags +pub const NOTE_FFCOPY = 0xc0000000; + +/// mask for operations +pub const NOTE_FFCTRLMASK = 0xc0000000; +pub const NOTE_FFLAGSMASK = 0x00ffffff; + +/// low water mark +pub const NOTE_LOWAT = 0x00000001; + +/// OOB data +pub const NOTE_OOB = 0x00000002; + +/// vnode was removed +pub const NOTE_DELETE = 0x00000001; + +/// data contents changed +pub const NOTE_WRITE = 0x00000002; + +/// size increased +pub const NOTE_EXTEND = 0x00000004; + +/// attributes changed +pub const NOTE_ATTRIB = 0x00000008; + +/// link count changed +pub const NOTE_LINK = 0x00000010; + +/// vnode was renamed +pub const NOTE_RENAME = 0x00000020; + +/// vnode access was revoked +pub const NOTE_REVOKE = 0x00000040; + +/// No specific vnode event: to test for EVFILT_READ activation +pub const NOTE_NONE = 0x00000080; + +/// vnode was unlocked by flock(2) +pub const NOTE_FUNLOCK = 0x00000100; + +/// process exited +pub const NOTE_EXIT = 0x80000000; + +/// process forked +pub const NOTE_FORK = 0x40000000; + +/// process exec'd +pub const NOTE_EXEC = 0x20000000; + +/// shared with EVFILT_SIGNAL +pub const NOTE_SIGNAL = 0x08000000; + +/// exit status to be returned, valid for child process only +pub const NOTE_EXITSTATUS = 0x04000000; + +/// provide details on reasons for exit +pub const NOTE_EXIT_DETAIL = 0x02000000; + +/// mask for signal & exit status +pub const NOTE_PDATAMASK = 0x000fffff; +pub const NOTE_PCTRLMASK = (~NOTE_PDATAMASK); + +pub const NOTE_EXIT_DETAIL_MASK = 0x00070000; +pub const NOTE_EXIT_DECRYPTFAIL = 0x00010000; +pub const NOTE_EXIT_MEMORY = 0x00020000; +pub const NOTE_EXIT_CSERROR = 0x00040000; + +/// will react on memory pressure +pub const NOTE_VM_PRESSURE = 0x80000000; + +/// will quit on memory pressure, possibly after cleaning up dirty state +pub const NOTE_VM_PRESSURE_TERMINATE = 0x40000000; + +/// will quit immediately on memory pressure +pub const NOTE_VM_PRESSURE_SUDDEN_TERMINATE = 0x20000000; + +/// there was an error +pub const NOTE_VM_ERROR = 0x10000000; + +/// data is seconds +pub const NOTE_SECONDS = 0x00000001; + +/// data is microseconds +pub const NOTE_USECONDS = 0x00000002; + +/// data is nanoseconds +pub const NOTE_NSECONDS = 0x00000004; + +/// absolute timeout +pub const NOTE_ABSOLUTE = 0x00000008; + +/// ext[1] holds leeway for power aware timers +pub const NOTE_LEEWAY = 0x00000010; + +/// system does minimal timer coalescing +pub const NOTE_CRITICAL = 0x00000020; + +/// system does maximum timer coalescing +pub const NOTE_BACKGROUND = 0x00000040; +pub const NOTE_MACH_CONTINUOUS_TIME = 0x00000080; + +/// data is mach absolute time units +pub const NOTE_MACHTIME = 0x00000100; + fn wstatus(x: i32) i32 { return x & 0o177; } @@ -385,6 +603,31 @@ pub fn getdirentries64(fd: i32, buf_ptr: [*]u8, buf_len: usize, basep: *i64) usi return errnoWrap(@bitCast(isize, c.__getdirentries64(fd, buf_ptr, buf_len, basep))); } +pub fn kqueue() usize { + return errnoWrap(c.kqueue()); +} + +pub fn kevent(kq: i32, changelist: []const Kevent, eventlist: []Kevent, timeout: ?*const timespec) usize { + return errnoWrap(c.kevent( + kq, + changelist.ptr, + @intCast(c_int, changelist.len), + eventlist.ptr, + @intCast(c_int, eventlist.len), + timeout, + )); +} + +pub fn kevent64( + kq: i32, + changelist: []const kevent64_s, + eventlist: []kevent64_s, + flags: u32, + timeout: ?*const timespec, +) usize { + return errnoWrap(c.kevent64(kq, changelist.ptr, changelist.len, eventlist.ptr, eventlist.len, flags, timeout)); +} + pub fn mkdir(path: [*]const u8, mode: u32) usize { return errnoWrap(c.mkdir(path, mode)); } @@ -393,6 +636,18 @@ pub fn symlink(existing: [*]const u8, new: [*]const u8) usize { return errnoWrap(c.symlink(existing, new)); } +pub fn sysctl(name: [*]c_int, namelen: c_uint, oldp: ?*c_void, oldlenp: ?*usize, newp: ?*c_void, newlen: usize) usize { + return errnoWrap(c.sysctl(name, namelen, oldp, oldlenp, newp, newlen)); +} + +pub fn sysctlbyname(name: [*]const u8, oldp: ?*c_void, oldlenp: ?*usize, newp: ?*c_void, newlen: usize) usize { + return errnoWrap(c.sysctlbyname(name, oldp, oldlenp, newp, newlen)); +} + +pub fn sysctlnametomib(name: [*]const u8, mibp: ?*c_int, sizep: ?*usize) usize { + return errnoWrap(c.sysctlnametomib(name, wibp, sizep)); +} + pub fn rename(old: [*]const u8, new: [*]const u8) usize { return errnoWrap(c.rename(old, new)); } @@ -474,6 +729,10 @@ pub const dirent = c.dirent; pub const sa_family_t = c.sa_family_t; pub const sockaddr = c.sockaddr; +/// Renamed from `kevent` to `Kevent` to avoid conflict with the syscall. +pub const Kevent = c.Kevent; +pub const kevent64_s = c.kevent64_s; + /// Renamed from `sigaction` to `Sigaction` to avoid conflict with the syscall. pub const Sigaction = struct { handler: extern fn (i32) void, diff --git a/std/os/index.zig b/std/os/index.zig index 52b36c351c..79b2d2ff53 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -61,6 +61,15 @@ pub const windowsLoadDll = windows_util.windowsLoadDll; pub const windowsUnloadDll = windows_util.windowsUnloadDll; pub const createWindowsEnvBlock = windows_util.createWindowsEnvBlock; +pub const WindowsCreateIoCompletionPortError = windows_util.WindowsCreateIoCompletionPortError; +pub const windowsCreateIoCompletionPort = windows_util.windowsCreateIoCompletionPort; + +pub const WindowsPostQueuedCompletionStatusError = windows_util.WindowsPostQueuedCompletionStatusError; +pub const windowsPostQueuedCompletionStatus = windows_util.windowsPostQueuedCompletionStatus; + +pub const WindowsWaitResult = windows_util.WindowsWaitResult; +pub const windowsGetQueuedCompletionStatus = windows_util.windowsGetQueuedCompletionStatus; + pub const WindowsWaitError = windows_util.WaitError; pub const WindowsOpenError = windows_util.OpenError; pub const WindowsWriteError = windows_util.WriteError; @@ -544,8 +553,13 @@ pub fn getEnvPosix(key: []const u8) ?[]const u8 { return null; } +pub const GetEnvVarOwnedError = error{ + OutOfMemory, + EnvironmentVariableNotFound, +}; + /// Caller must free returned memory. -pub fn getEnvVarOwned(allocator: *mem.Allocator, key: []const u8) ![]u8 { +pub fn getEnvVarOwned(allocator: *mem.Allocator, key: []const u8) GetEnvVarOwnedError![]u8 { if (is_windows) { const key_with_null = try cstr.addNullByte(allocator, key); defer allocator.free(key_with_null); @@ -554,14 +568,17 @@ pub fn getEnvVarOwned(allocator: *mem.Allocator, key: []const u8) ![]u8 { errdefer allocator.free(buf); while (true) { - const windows_buf_len = try math.cast(windows.DWORD, buf.len); + const windows_buf_len = math.cast(windows.DWORD, buf.len) catch return error.OutOfMemory; const result = windows.GetEnvironmentVariableA(key_with_null.ptr, buf.ptr, windows_buf_len); if (result == 0) { const err = windows.GetLastError(); return switch (err) { windows.ERROR.ENVVAR_NOT_FOUND => error.EnvironmentVariableNotFound, - else => unexpectedErrorWindows(err), + else => { + _ = unexpectedErrorWindows(err); + return error.EnvironmentVariableNotFound; + }, }; } @@ -2309,6 +2326,30 @@ pub fn linuxEpollWait(epfd: i32, events: []linux.epoll_event, timeout: i32) usiz } } +pub const LinuxEventFdError = error{ + InvalidFlagValue, + SystemResources, + ProcessFdQuotaExceeded, + SystemFdQuotaExceeded, + + Unexpected, +}; + +pub fn linuxEventFd(initval: u32, flags: u32) LinuxEventFdError!i32 { + const rc = posix.eventfd(initval, flags); + const err = posix.getErrno(rc); + switch (err) { + 0 => return @intCast(i32, rc), + else => return unexpectedErrorPosix(err), + + posix.EINVAL => return LinuxEventFdError.InvalidFlagValue, + posix.EMFILE => return LinuxEventFdError.ProcessFdQuotaExceeded, + posix.ENFILE => return LinuxEventFdError.SystemFdQuotaExceeded, + posix.ENODEV => return LinuxEventFdError.SystemResources, + posix.ENOMEM => return LinuxEventFdError.SystemResources, + } +} + pub const PosixGetSockNameError = error{ /// Insufficient resources were available in the system to perform the operation. SystemResources, @@ -2568,11 +2609,17 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!*Thread thread: Thread, inner: Context, }; - extern fn threadMain(arg: windows.LPVOID) windows.DWORD { - if (@sizeOf(Context) == 0) { - return startFn({}); - } else { - return startFn(@ptrCast(*Context, @alignCast(@alignOf(Context), arg)).*); + extern fn threadMain(raw_arg: windows.LPVOID) windows.DWORD { + const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*; + switch (@typeId(@typeOf(startFn).ReturnType)) { + builtin.TypeId.Int => { + return startFn(arg); + }, + builtin.TypeId.Void => { + startFn(arg); + return 0; + }, + else => @compileError("expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"), } } }; @@ -2605,10 +2652,17 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!*Thread const MainFuncs = struct { extern fn linuxThreadMain(ctx_addr: usize) u8 { - if (@sizeOf(Context) == 0) { - return startFn({}); - } else { - return startFn(@intToPtr(*const Context, ctx_addr).*); + const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*; + + switch (@typeId(@typeOf(startFn).ReturnType)) { + builtin.TypeId.Int => { + return startFn(arg); + }, + builtin.TypeId.Void => { + startFn(arg); + return 0; + }, + else => @compileError("expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"), } } extern fn posixThreadMain(ctx: ?*c_void) ?*c_void { @@ -2717,3 +2771,128 @@ pub fn posixFStat(fd: i32) !posix.Stat { return stat; } + +pub const CpuCountError = error{ + OutOfMemory, + PermissionDenied, + Unexpected, +}; + +pub fn cpuCount(fallback_allocator: *mem.Allocator) CpuCountError!usize { + switch (builtin.os) { + builtin.Os.macosx => { + var count: c_int = undefined; + var count_len: usize = @sizeOf(c_int); + const rc = posix.sysctlbyname(c"hw.ncpu", @ptrCast(*c_void, &count), &count_len, null, 0); + const err = posix.getErrno(rc); + switch (err) { + 0 => return @intCast(usize, count), + posix.EFAULT => unreachable, + posix.EINVAL => unreachable, + posix.ENOMEM => return CpuCountError.OutOfMemory, + posix.ENOTDIR => unreachable, + posix.EISDIR => unreachable, + posix.ENOENT => unreachable, + posix.EPERM => unreachable, + else => return os.unexpectedErrorPosix(err), + } + }, + builtin.Os.linux => { + const usize_count = 16; + const allocator = std.heap.stackFallback(usize_count * @sizeOf(usize), fallback_allocator).get(); + + var set = try allocator.alloc(usize, usize_count); + defer allocator.free(set); + + while (true) { + const rc = posix.sched_getaffinity(0, set); + const err = posix.getErrno(rc); + switch (err) { + 0 => { + if (rc < set.len * @sizeOf(usize)) { + const result = set[0 .. rc / @sizeOf(usize)]; + var sum: usize = 0; + for (result) |x| { + sum += @popCount(x); + } + return sum; + } else { + set = try allocator.realloc(usize, set, set.len * 2); + continue; + } + }, + posix.EFAULT => unreachable, + posix.EINVAL => unreachable, + posix.EPERM => return CpuCountError.PermissionDenied, + posix.ESRCH => unreachable, + else => return os.unexpectedErrorPosix(err), + } + } + }, + builtin.Os.windows => { + var system_info: windows.SYSTEM_INFO = undefined; + windows.GetSystemInfo(&system_info); + return @intCast(usize, system_info.dwNumberOfProcessors); + }, + else => @compileError("unsupported OS"), + } +} + +pub const BsdKQueueError = error{ + /// The per-process limit on the number of open file descriptors has been reached. + ProcessFdQuotaExceeded, + + /// The system-wide limit on the total number of open files has been reached. + SystemFdQuotaExceeded, + + Unexpected, +}; + +pub fn bsdKQueue() BsdKQueueError!i32 { + const rc = posix.kqueue(); + const err = posix.getErrno(rc); + switch (err) { + 0 => return @intCast(i32, rc), + posix.EMFILE => return BsdKQueueError.ProcessFdQuotaExceeded, + posix.ENFILE => return BsdKQueueError.SystemFdQuotaExceeded, + else => return unexpectedErrorPosix(err), + } +} + +pub const BsdKEventError = error{ + /// The process does not have permission to register a filter. + AccessDenied, + + /// The event could not be found to be modified or deleted. + EventNotFound, + + /// No memory was available to register the event. + SystemResources, + + /// The specified process to attach to does not exist. + ProcessNotFound, +}; + +pub fn bsdKEvent( + kq: i32, + changelist: []const posix.Kevent, + eventlist: []posix.Kevent, + timeout: ?*const posix.timespec, +) BsdKEventError!usize { + while (true) { + const rc = posix.kevent(kq, changelist, eventlist, timeout); + const err = posix.getErrno(rc); + switch (err) { + 0 => return rc, + posix.EACCES => return BsdKEventError.AccessDenied, + posix.EFAULT => unreachable, + posix.EBADF => unreachable, + posix.EINTR => continue, + posix.EINVAL => unreachable, + posix.ENOENT => return BsdKEventError.EventNotFound, + posix.ENOMEM => return BsdKEventError.SystemResources, + posix.ESRCH => return BsdKEventError.ProcessNotFound, + else => unreachable, + } + } +} diff --git a/std/os/linux/index.zig b/std/os/linux/index.zig index 65aa659c82..69bc30bad0 100644 --- a/std/os/linux/index.zig +++ b/std/os/linux/index.zig @@ -523,6 +523,10 @@ pub const CLONE_NEWPID = 0x20000000; pub const CLONE_NEWNET = 0x40000000; pub const CLONE_IO = 0x80000000; +pub const EFD_SEMAPHORE = 1; +pub const EFD_CLOEXEC = O_CLOEXEC; +pub const EFD_NONBLOCK = O_NONBLOCK; + pub const MS_RDONLY = 1; pub const MS_NOSUID = 2; pub const MS_NODEV = 4; @@ -1193,6 +1197,10 @@ pub fn fremovexattr(fd: usize, name: [*]const u8) usize { return syscall2(SYS_fremovexattr, fd, @ptrToInt(name)); } +pub fn sched_getaffinity(pid: i32, set: []usize) usize { + return syscall3(SYS_sched_getaffinity, @bitCast(usize, isize(pid)), set.len * @sizeOf(usize), @ptrToInt(set.ptr)); +} + pub const epoll_data = packed union { ptr: usize, fd: i32, @@ -1221,6 +1229,10 @@ pub fn epoll_wait(epoll_fd: i32, events: [*]epoll_event, maxevents: u32, timeout return syscall4(SYS_epoll_wait, @intCast(usize, epoll_fd), @ptrToInt(events), @intCast(usize, maxevents), @intCast(usize, timeout)); } +pub fn eventfd(count: u32, flags: u32) usize { + return syscall2(SYS_eventfd2, count, flags); +} + pub fn timerfd_create(clockid: i32, flags: u32) usize { return syscall2(SYS_timerfd_create, @intCast(usize, clockid), @intCast(usize, flags)); } diff --git a/std/os/test.zig b/std/os/test.zig index 5a977a569a..52e6ffdc1c 100644 --- a/std/os/test.zig +++ b/std/os/test.zig @@ -58,3 +58,8 @@ fn start2(ctx: *i32) u8 { _ = @atomicRmw(i32, ctx, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); return 0; } + +test "cpu count" { + const cpu_count = try std.os.cpuCount(a); + assert(cpu_count >= 1); +} diff --git a/std/os/windows/index.zig b/std/os/windows/index.zig index d631c6adbf..f73b8ec261 100644 --- a/std/os/windows/index.zig +++ b/std/os/windows/index.zig @@ -59,6 +59,9 @@ pub extern "kernel32" stdcallcc fn CreateSymbolicLinkA( dwFlags: DWORD, ) BOOLEAN; + +pub extern "kernel32" stdcallcc fn CreateIoCompletionPort(FileHandle: HANDLE, ExistingCompletionPort: ?HANDLE, CompletionKey: ULONG_PTR, NumberOfConcurrentThreads: DWORD) ?HANDLE; + pub extern "kernel32" stdcallcc fn CreateThread(lpThreadAttributes: ?LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: LPTHREAD_START_ROUTINE, lpParameter: ?LPVOID, dwCreationFlags: DWORD, lpThreadId: ?LPDWORD) ?HANDLE; pub extern "kernel32" stdcallcc fn DeleteFileA(lpFileName: LPCSTR) BOOL; @@ -106,7 +109,9 @@ pub extern "kernel32" stdcallcc fn GetFinalPathNameByHandleA( ) DWORD; pub extern "kernel32" stdcallcc fn GetProcessHeap() ?HANDLE; +pub extern "kernel32" stdcallcc fn GetQueuedCompletionStatus(CompletionPort: HANDLE, lpNumberOfBytesTransferred: LPDWORD, lpCompletionKey: *ULONG_PTR, lpOverlapped: *?*OVERLAPPED, dwMilliseconds: DWORD) BOOL; +pub extern "kernel32" stdcallcc fn GetSystemInfo(lpSystemInfo: *SYSTEM_INFO) void; pub extern "kernel32" stdcallcc fn GetSystemTimeAsFileTime(*FILETIME) void; pub extern "kernel32" stdcallcc fn HeapCreate(flOptions: DWORD, dwInitialSize: SIZE_T, dwMaximumSize: SIZE_T) ?HANDLE; @@ -129,6 +134,9 @@ pub extern "kernel32" stdcallcc fn MoveFileExA( dwFlags: DWORD, ) BOOL; + +pub extern "kernel32" stdcallcc fn PostQueuedCompletionStatus(CompletionPort: HANDLE, dwNumberOfBytesTransferred: DWORD, dwCompletionKey: ULONG_PTR, lpOverlapped: ?*OVERLAPPED) BOOL; + pub extern "kernel32" stdcallcc fn QueryPerformanceCounter(lpPerformanceCount: *LARGE_INTEGER) BOOL; pub extern "kernel32" stdcallcc fn QueryPerformanceFrequency(lpFrequency: *LARGE_INTEGER) BOOL; @@ -204,6 +212,7 @@ pub const SIZE_T = usize; pub const TCHAR = if (UNICODE) WCHAR else u8; pub const UINT = c_uint; pub const ULONG_PTR = usize; +pub const DWORD_PTR = ULONG_PTR; pub const UNICODE = false; pub const WCHAR = u16; pub const WORD = u16; @@ -413,3 +422,22 @@ pub const FILETIME = extern struct { dwLowDateTime: DWORD, dwHighDateTime: DWORD, }; + +pub const SYSTEM_INFO = extern struct { + anon1: extern union { + dwOemId: DWORD, + anon2: extern struct { + wProcessorArchitecture: WORD, + wReserved: WORD, + }, + }, + dwPageSize: DWORD, + lpMinimumApplicationAddress: LPVOID, + lpMaximumApplicationAddress: LPVOID, + dwActiveProcessorMask: DWORD_PTR, + dwNumberOfProcessors: DWORD, + dwProcessorType: DWORD, + dwAllocationGranularity: DWORD, + wProcessorLevel: WORD, + wProcessorRevision: WORD, +}; diff --git a/std/os/windows/util.zig b/std/os/windows/util.zig index 45b205451d..b04e8efc4b 100644 --- a/std/os/windows/util.zig +++ b/std/os/windows/util.zig @@ -214,3 +214,50 @@ pub fn windowsFindNextFile(handle: windows.HANDLE, find_file_data: *windows.WIN3 } return true; } + + +pub const WindowsCreateIoCompletionPortError = error { + Unexpected, +}; + +pub fn windowsCreateIoCompletionPort(file_handle: windows.HANDLE, existing_completion_port: ?windows.HANDLE, completion_key: usize, concurrent_thread_count: windows.DWORD) !windows.HANDLE { + const handle = windows.CreateIoCompletionPort(file_handle, existing_completion_port, completion_key, concurrent_thread_count) orelse { + const err = windows.GetLastError(); + switch (err) { + else => return os.unexpectedErrorWindows(err), + } + }; + return handle; +} + +pub const WindowsPostQueuedCompletionStatusError = error { + Unexpected, +}; + +pub fn windowsPostQueuedCompletionStatus(completion_port: windows.HANDLE, bytes_transferred_count: windows.DWORD, completion_key: usize, lpOverlapped: ?*windows.OVERLAPPED) WindowsPostQueuedCompletionStatusError!void { + if (windows.PostQueuedCompletionStatus(completion_port, bytes_transferred_count, completion_key, lpOverlapped) == 0) { + const err = windows.GetLastError(); + switch (err) { + else => return os.unexpectedErrorWindows(err), + } + } +} + +pub const WindowsWaitResult = error { + Normal, + Aborted, +}; + +pub fn windowsGetQueuedCompletionStatus(completion_port: windows.HANDLE, bytes_transferred_count: *windows.DWORD, lpCompletionKey: *usize, lpOverlapped: *?*windows.OVERLAPPED, dwMilliseconds: windows.DWORD) WindowsWaitResult { + if (windows.GetQueuedCompletionStatus(completion_port, bytes_transferred_count, lpCompletionKey, lpOverlapped, dwMilliseconds) == windows.FALSE) { + if (std.debug.runtime_safety) { + const err = windows.GetLastError(); + if (err != windows.ERROR.ABANDONED_WAIT_0) { + std.debug.warn("err: {}\n", err); + } + assert(err == windows.ERROR.ABANDONED_WAIT_0); + } + return WindowsWaitResult.Aborted; + } + return WindowsWaitResult.Normal; +} diff --git a/std/special/build_runner.zig b/std/special/build_runner.zig index e4f04df6d0..2f073b3e98 100644 --- a/std/special/build_runner.zig +++ b/std/special/build_runner.zig @@ -122,10 +122,13 @@ pub fn main() !void { return usageAndErr(&builder, true, try stderr_stream); builder.make(targets.toSliceConst()) catch |err| { - if (err == error.InvalidStepName) { - return usageAndErr(&builder, true, try stderr_stream); + switch (err) { + error.InvalidStepName => { + return usageAndErr(&builder, true, try stderr_stream); + }, + error.UncleanExit => os.exit(1), + else => return err, } - return err; }; } diff --git a/std/special/compiler_rt/extendXfYf2_test.zig b/std/special/compiler_rt/extendXfYf2_test.zig index 185c83a0ef..9969607011 100644 --- a/std/special/compiler_rt/extendXfYf2_test.zig +++ b/std/special/compiler_rt/extendXfYf2_test.zig @@ -31,7 +31,7 @@ fn test__extendhfsf2(a: u16, expected: u32) void { if (rep == expected) { if (rep & 0x7fffffff > 0x7f800000) { - return; // NaN is always unequal. + return; // NaN is always unequal. } if (x == @bitCast(f32, expected)) { return; @@ -86,33 +86,33 @@ test "extenddftf2" { } test "extendhfsf2" { - test__extendhfsf2(0x7e00, 0x7fc00000); // qNaN - test__extendhfsf2(0x7f00, 0x7fe00000); // sNaN - test__extendhfsf2(0x7c01, 0x7f802000); // sNaN + test__extendhfsf2(0x7e00, 0x7fc00000); // qNaN + test__extendhfsf2(0x7f00, 0x7fe00000); // sNaN + test__extendhfsf2(0x7c01, 0x7f802000); // sNaN - test__extendhfsf2(0, 0); // 0 - test__extendhfsf2(0x8000, 0x80000000); // -0 + test__extendhfsf2(0, 0); // 0 + test__extendhfsf2(0x8000, 0x80000000); // -0 - test__extendhfsf2(0x7c00, 0x7f800000); // inf - test__extendhfsf2(0xfc00, 0xff800000); // -inf + test__extendhfsf2(0x7c00, 0x7f800000); // inf + test__extendhfsf2(0xfc00, 0xff800000); // -inf - test__extendhfsf2(0x0001, 0x33800000); // denormal (min), 2**-24 - test__extendhfsf2(0x8001, 0xb3800000); // denormal (min), -2**-24 + test__extendhfsf2(0x0001, 0x33800000); // denormal (min), 2**-24 + test__extendhfsf2(0x8001, 0xb3800000); // denormal (min), -2**-24 - test__extendhfsf2(0x03ff, 0x387fc000); // denormal (max), 2**-14 - 2**-24 - test__extendhfsf2(0x83ff, 0xb87fc000); // denormal (max), -2**-14 + 2**-24 + test__extendhfsf2(0x03ff, 0x387fc000); // denormal (max), 2**-14 - 2**-24 + test__extendhfsf2(0x83ff, 0xb87fc000); // denormal (max), -2**-14 + 2**-24 - test__extendhfsf2(0x0400, 0x38800000); // normal (min), 2**-14 - test__extendhfsf2(0x8400, 0xb8800000); // normal (min), -2**-14 + test__extendhfsf2(0x0400, 0x38800000); // normal (min), 2**-14 + test__extendhfsf2(0x8400, 0xb8800000); // normal (min), -2**-14 - test__extendhfsf2(0x7bff, 0x477fe000); // normal (max), 65504 - test__extendhfsf2(0xfbff, 0xc77fe000); // normal (max), -65504 + test__extendhfsf2(0x7bff, 0x477fe000); // normal (max), 65504 + test__extendhfsf2(0xfbff, 0xc77fe000); // normal (max), -65504 - test__extendhfsf2(0x3c01, 0x3f802000); // normal, 1 + 2**-10 - test__extendhfsf2(0xbc01, 0xbf802000); // normal, -1 - 2**-10 + test__extendhfsf2(0x3c01, 0x3f802000); // normal, 1 + 2**-10 + test__extendhfsf2(0xbc01, 0xbf802000); // normal, -1 - 2**-10 - test__extendhfsf2(0x3555, 0x3eaaa000); // normal, approx. 1/3 - test__extendhfsf2(0xb555, 0xbeaaa000); // normal, approx. -1/3 + test__extendhfsf2(0x3555, 0x3eaaa000); // normal, approx. 1/3 + test__extendhfsf2(0xb555, 0xbeaaa000); // normal, approx. -1/3 } test "extendsftf2" { diff --git a/std/zig/bench.zig b/std/zig/bench.zig index 59392889a6..630f6b2233 100644 --- a/std/zig/bench.zig +++ b/std/zig/bench.zig @@ -19,20 +19,18 @@ pub fn main() !void { } const end = timer.read(); memory_used /= iterations; - const elapsed_s = f64(end - start) / std.os.time.ns_per_s; - const bytes_per_sec = f64(source.len * iterations) / elapsed_s; + const elapsed_s = @intToFloat(f64, end - start) / std.os.time.ns_per_s; + const bytes_per_sec = @intToFloat(f64, source.len * iterations) / elapsed_s; const mb_per_sec = bytes_per_sec / (1024 * 1024); var stdout_file = try std.io.getStdOut(); - const stdout = *std.io.FileOutStream.init(*stdout_file).stream; - try stdout.print("{.3} MB/s, {} KB used \n", mb_per_sec, memory_used / 1024); + const stdout = &std.io.FileOutStream.init(&stdout_file).stream; + try stdout.print("{.3} MiB/s, {} KiB used \n", mb_per_sec, memory_used / 1024); } fn testOnce() usize { var fixed_buf_alloc = std.heap.FixedBufferAllocator.init(fixed_buffer_mem[0..]); - var allocator = *fixed_buf_alloc.allocator; - var tokenizer = Tokenizer.init(source); - var parser = Parser.init(*tokenizer, allocator, "(memory buffer)"); - _ = parser.parse() catch @panic("parse failure"); + var allocator = &fixed_buf_alloc.allocator; + _ = std.zig.parse(allocator, source) catch @panic("parse failure"); return fixed_buf_alloc.end_index; } |
