aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread.zig
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2021-02-25 21:04:23 -0700
committerAndrew Kelley <andrew@ziglang.org>2021-02-25 21:04:23 -0700
commit0b58b617998b79a765b54f88fbe90ca2798b3d3e (patch)
treeca6cc4b6bcc2b93166d196049ee49416afe781ad /lib/std/Thread.zig
parentdc325669e360f7a9dfa24f85a62fa386529dade6 (diff)
parentfd208d9d5913a0929e444deb97b91092c427bb14 (diff)
downloadzig-0b58b617998b79a765b54f88fbe90ca2798b3d3e.tar.gz
zig-0b58b617998b79a765b54f88fbe90ca2798b3d3e.zip
Merge remote-tracking branch 'origin/master' into llvm12
Conflicts: * src/clang.zig * src/llvm.zig - this file got moved to src/llvm/bindings.zig in master branch so I had to put the new LLVM arch/os enum tags into it. * lib/std/target.zig, src/stage1/target.cpp - haiku had an inconsistency with its default target ABI, gnu vs eabi. In this commit we make it gnu in both places to match the latest changes by @hoanga. * src/translate_c.zig
Diffstat (limited to 'lib/std/Thread.zig')
-rw-r--r--lib/std/Thread.zig565
1 files changed, 565 insertions, 0 deletions
diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig
new file mode 100644
index 0000000000..77277bd154
--- /dev/null
+++ b/lib/std/Thread.zig
@@ -0,0 +1,565 @@
+// SPDX-License-Identifier: MIT
+// Copyright (c) 2015-2021 Zig Contributors
+// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
+// The MIT license requires this copyright notice to be included in all copies
+// and substantial portions of the software.
+
+//! This struct represents a kernel thread, and acts as a namespace for concurrency
+//! primitives that operate on kernel threads. For concurrency primitives that support
+//! both evented I/O and async I/O, see the respective names in the top level std namespace.
+
+data: Data,
+
+pub const AutoResetEvent = @import("Thread/AutoResetEvent.zig");
+pub const ResetEvent = @import("Thread/ResetEvent.zig");
+pub const StaticResetEvent = @import("Thread/StaticResetEvent.zig");
+pub const Mutex = @import("Thread/Mutex.zig");
+pub const Semaphore = @import("Thread/Semaphore.zig");
+pub const Condition = @import("Thread/Condition.zig");
+
+pub const use_pthreads = std.Target.current.os.tag != .windows and builtin.link_libc;
+
+const Thread = @This();
+const std = @import("std.zig");
+const builtin = std.builtin;
+const os = std.os;
+const mem = std.mem;
+const windows = std.os.windows;
+const c = std.c;
+const assert = std.debug.assert;
+
+const bad_startfn_ret = "expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'";
+
+/// Represents a kernel thread handle.
+/// May be an integer or a pointer depending on the platform.
+/// On Linux and POSIX, this is the same as Id.
+pub const Handle = if (use_pthreads)
+ c.pthread_t
+else switch (std.Target.current.os.tag) {
+ .linux => i32,
+ .windows => windows.HANDLE,
+ else => void,
+};
+
+/// Represents a unique ID per thread.
+/// May be an integer or pointer depending on the platform.
+/// On Linux and POSIX, this is the same as Handle.
+pub const Id = switch (std.Target.current.os.tag) {
+ .windows => windows.DWORD,
+ else => Handle,
+};
+
+pub const Data = if (use_pthreads)
+ struct {
+ handle: Thread.Handle,
+ memory: []u8,
+ }
+else switch (std.Target.current.os.tag) {
+ .linux => struct {
+ handle: Thread.Handle,
+ memory: []align(mem.page_size) u8,
+ },
+ .windows => struct {
+ handle: Thread.Handle,
+ alloc_start: *c_void,
+ heap_handle: windows.HANDLE,
+ },
+ else => struct {},
+};
+
+/// Signals the processor that it is inside a busy-wait spin-loop ("spin lock").
+pub fn spinLoopHint() void {
+ switch (std.Target.current.cpu.arch) {
+ .i386, .x86_64 => asm volatile ("pause" ::: "memory"),
+ .arm, .aarch64 => asm volatile ("yield" ::: "memory"),
+ else => {},
+ }
+}
+
+/// Returns the ID of the calling thread.
+/// Makes a syscall every time the function is called.
+/// On Linux and POSIX, this Id is the same as a Handle.
+pub fn getCurrentId() Id {
+ if (use_pthreads) {
+ return c.pthread_self();
+ } else return switch (std.Target.current.os.tag) {
+ .linux => os.linux.gettid(),
+ .windows => windows.kernel32.GetCurrentThreadId(),
+ else => @compileError("Unsupported OS"),
+ };
+}
+
+/// Returns the handle of this thread.
+/// On Linux and POSIX, this is the same as Id.
+/// On Linux, it is possible that the thread spawned with `spawn`
+/// finishes executing entirely before the clone syscall completes. In this
+/// case, this function will return 0 rather than the no-longer-existing thread's
+/// pid.
+pub fn handle(self: Thread) Handle {
+ return self.data.handle;
+}
+
+pub fn wait(self: *Thread) void {
+ if (use_pthreads) {
+ const err = c.pthread_join(self.data.handle, null);
+ switch (err) {
+ 0 => {},
+ os.EINVAL => unreachable,
+ os.ESRCH => unreachable,
+ os.EDEADLK => unreachable,
+ else => unreachable,
+ }
+ std.heap.c_allocator.free(self.data.memory);
+ std.heap.c_allocator.destroy(self);
+ } else switch (std.Target.current.os.tag) {
+ .linux => {
+ while (true) {
+ const pid_value = @atomicLoad(i32, &self.data.handle, .SeqCst);
+ if (pid_value == 0) break;
+ const rc = os.linux.futex_wait(&self.data.handle, os.linux.FUTEX_WAIT, pid_value, null);
+ switch (os.linux.getErrno(rc)) {
+ 0 => continue,
+ os.EINTR => continue,
+ os.EAGAIN => continue,
+ else => unreachable,
+ }
+ }
+ os.munmap(self.data.memory);
+ },
+ .windows => {
+ windows.WaitForSingleObjectEx(self.data.handle, windows.INFINITE, false) catch unreachable;
+ windows.CloseHandle(self.data.handle);
+ windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start);
+ },
+ else => @compileError("Unsupported OS"),
+ }
+}
+
+pub const SpawnError = error{
+ /// A system-imposed limit on the number of threads was encountered.
+ /// There are a number of limits that may trigger this error:
+ /// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)),
+ /// which limits the number of processes and threads for a real
+ /// user ID, was reached;
+ /// * the kernel's system-wide limit on the number of processes and
+ /// threads, /proc/sys/kernel/threads-max, was reached (see
+ /// proc(5));
+ /// * the maximum number of PIDs, /proc/sys/kernel/pid_max, was
+ /// reached (see proc(5)); or
+ /// * the PID limit (pids.max) imposed by the cgroup "process num‐
+ /// ber" (PIDs) controller was reached.
+ ThreadQuotaExceeded,
+
+ /// The kernel cannot allocate sufficient memory to allocate a task structure
+ /// for the child, or to copy those parts of the caller's context that need to
+ /// be copied.
+ SystemResources,
+
+ /// Not enough userland memory to spawn the thread.
+ OutOfMemory,
+
+ /// `mlockall` is enabled, and the memory needed to spawn the thread
+ /// would exceed the limit.
+ LockedMemoryLimitExceeded,
+
+ Unexpected,
+};
+
+/// caller must call wait on the returned thread
+/// fn startFn(@TypeOf(context)) T
+/// where T is u8, noreturn, void, or !void
+/// caller must call wait on the returned thread
+pub fn spawn(context: anytype, comptime startFn: anytype) SpawnError!*Thread {
+ if (builtin.single_threaded) @compileError("cannot spawn thread when building in single-threaded mode");
+ // TODO compile-time call graph analysis to determine stack upper bound
+ // https://github.com/ziglang/zig/issues/157
+ const default_stack_size = 16 * 1024 * 1024;
+
+ const Context = @TypeOf(context);
+ comptime assert(@typeInfo(@TypeOf(startFn)).Fn.args[0].arg_type.? == Context);
+
+ if (std.Target.current.os.tag == .windows) {
+ const WinThread = struct {
+ const OuterContext = struct {
+ thread: Thread,
+ inner: Context,
+ };
+ fn threadMain(raw_arg: windows.LPVOID) callconv(.C) windows.DWORD {
+ const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*;
+
+ switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) {
+ .NoReturn => {
+ startFn(arg);
+ },
+ .Void => {
+ startFn(arg);
+ return 0;
+ },
+ .Int => |info| {
+ if (info.bits != 8) {
+ @compileError(bad_startfn_ret);
+ }
+ return startFn(arg);
+ },
+ .ErrorUnion => |info| {
+ if (info.payload != void) {
+ @compileError(bad_startfn_ret);
+ }
+ startFn(arg) catch |err| {
+ std.debug.warn("error: {s}\n", .{@errorName(err)});
+ if (@errorReturnTrace()) |trace| {
+ std.debug.dumpStackTrace(trace.*);
+ }
+ };
+ return 0;
+ },
+ else => @compileError(bad_startfn_ret),
+ }
+ }
+ };
+
+ const heap_handle = windows.kernel32.GetProcessHeap() orelse return error.OutOfMemory;
+ const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext);
+ const bytes_ptr = windows.kernel32.HeapAlloc(heap_handle, 0, byte_count) orelse return error.OutOfMemory;
+ errdefer assert(windows.kernel32.HeapFree(heap_handle, 0, bytes_ptr) != 0);
+ const bytes = @ptrCast([*]u8, bytes_ptr)[0..byte_count];
+ const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable;
+ outer_context.* = WinThread.OuterContext{
+ .thread = Thread{
+ .data = Thread.Data{
+ .heap_handle = heap_handle,
+ .alloc_start = bytes_ptr,
+ .handle = undefined,
+ },
+ },
+ .inner = context,
+ };
+
+ const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(*c_void, &outer_context.inner);
+ outer_context.thread.data.handle = windows.kernel32.CreateThread(null, default_stack_size, WinThread.threadMain, parameter, 0, null) orelse {
+ switch (windows.kernel32.GetLastError()) {
+ else => |err| return windows.unexpectedError(err),
+ }
+ };
+ return &outer_context.thread;
+ }
+
+ const MainFuncs = struct {
+ fn linuxThreadMain(ctx_addr: usize) callconv(.C) u8 {
+ const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*;
+
+ switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) {
+ .NoReturn => {
+ startFn(arg);
+ },
+ .Void => {
+ startFn(arg);
+ return 0;
+ },
+ .Int => |info| {
+ if (info.bits != 8) {
+ @compileError(bad_startfn_ret);
+ }
+ return startFn(arg);
+ },
+ .ErrorUnion => |info| {
+ if (info.payload != void) {
+ @compileError(bad_startfn_ret);
+ }
+ startFn(arg) catch |err| {
+ std.debug.warn("error: {s}\n", .{@errorName(err)});
+ if (@errorReturnTrace()) |trace| {
+ std.debug.dumpStackTrace(trace.*);
+ }
+ };
+ return 0;
+ },
+ else => @compileError(bad_startfn_ret),
+ }
+ }
+ fn posixThreadMain(ctx: ?*c_void) callconv(.C) ?*c_void {
+ const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), ctx)).*;
+
+ switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) {
+ .NoReturn => {
+ startFn(arg);
+ },
+ .Void => {
+ startFn(arg);
+ return null;
+ },
+ .Int => |info| {
+ if (info.bits != 8) {
+ @compileError(bad_startfn_ret);
+ }
+ // pthreads don't support exit status, ignore value
+ _ = startFn(arg);
+ return null;
+ },
+ .ErrorUnion => |info| {
+ if (info.payload != void) {
+ @compileError(bad_startfn_ret);
+ }
+ startFn(arg) catch |err| {
+ std.debug.warn("error: {s}\n", .{@errorName(err)});
+ if (@errorReturnTrace()) |trace| {
+ std.debug.dumpStackTrace(trace.*);
+ }
+ };
+ return null;
+ },
+ else => @compileError(bad_startfn_ret),
+ }
+ }
+ };
+
+ if (Thread.use_pthreads) {
+ var attr: c.pthread_attr_t = undefined;
+ if (c.pthread_attr_init(&attr) != 0) return error.SystemResources;
+ defer assert(c.pthread_attr_destroy(&attr) == 0);
+
+ const thread_obj = try std.heap.c_allocator.create(Thread);
+ errdefer std.heap.c_allocator.destroy(thread_obj);
+ if (@sizeOf(Context) > 0) {
+ thread_obj.data.memory = try std.heap.c_allocator.allocAdvanced(
+ u8,
+ @alignOf(Context),
+ @sizeOf(Context),
+ .at_least,
+ );
+ errdefer std.heap.c_allocator.free(thread_obj.data.memory);
+ mem.copy(u8, thread_obj.data.memory, mem.asBytes(&context));
+ } else {
+ thread_obj.data.memory = @as([*]u8, undefined)[0..0];
+ }
+
+ // Use the same set of parameters used by the libc-less impl.
+ assert(c.pthread_attr_setstacksize(&attr, default_stack_size) == 0);
+ assert(c.pthread_attr_setguardsize(&attr, mem.page_size) == 0);
+
+ const err = c.pthread_create(
+ &thread_obj.data.handle,
+ &attr,
+ MainFuncs.posixThreadMain,
+ thread_obj.data.memory.ptr,
+ );
+ switch (err) {
+ 0 => return thread_obj,
+ os.EAGAIN => return error.SystemResources,
+ os.EPERM => unreachable,
+ os.EINVAL => unreachable,
+ else => return os.unexpectedErrno(@intCast(usize, err)),
+ }
+
+ return thread_obj;
+ }
+
+ var guard_end_offset: usize = undefined;
+ var stack_end_offset: usize = undefined;
+ var thread_start_offset: usize = undefined;
+ var context_start_offset: usize = undefined;
+ var tls_start_offset: usize = undefined;
+ const mmap_len = blk: {
+ var l: usize = mem.page_size;
+ // Allocate a guard page right after the end of the stack region
+ guard_end_offset = l;
+ // The stack itself, which grows downwards.
+ l = mem.alignForward(l + default_stack_size, mem.page_size);
+ stack_end_offset = l;
+ // Above the stack, so that it can be in the same mmap call, put the Thread object.
+ l = mem.alignForward(l, @alignOf(Thread));
+ thread_start_offset = l;
+ l += @sizeOf(Thread);
+ // Next, the Context object.
+ if (@sizeOf(Context) != 0) {
+ l = mem.alignForward(l, @alignOf(Context));
+ context_start_offset = l;
+ l += @sizeOf(Context);
+ }
+ // Finally, the Thread Local Storage, if any.
+ l = mem.alignForward(l, os.linux.tls.tls_image.alloc_align);
+ tls_start_offset = l;
+ l += os.linux.tls.tls_image.alloc_size;
+ // Round the size to the page size.
+ break :blk mem.alignForward(l, mem.page_size);
+ };
+
+ const mmap_slice = mem: {
+ // Map the whole stack with no rw permissions to avoid
+ // committing the whole region right away
+ const mmap_slice = os.mmap(
+ null,
+ mmap_len,
+ os.PROT_NONE,
+ os.MAP_PRIVATE | os.MAP_ANONYMOUS,
+ -1,
+ 0,
+ ) catch |err| switch (err) {
+ error.MemoryMappingNotSupported => unreachable,
+ error.AccessDenied => unreachable,
+ error.PermissionDenied => unreachable,
+ else => |e| return e,
+ };
+ errdefer os.munmap(mmap_slice);
+
+ // Map everything but the guard page as rw
+ os.mprotect(
+ mmap_slice[guard_end_offset..],
+ os.PROT_READ | os.PROT_WRITE,
+ ) catch |err| switch (err) {
+ error.AccessDenied => unreachable,
+ else => |e| return e,
+ };
+
+ break :mem mmap_slice;
+ };
+
+ const mmap_addr = @ptrToInt(mmap_slice.ptr);
+
+ const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(*Thread, mmap_addr + thread_start_offset));
+ thread_ptr.data.memory = mmap_slice;
+
+ var arg: usize = undefined;
+ if (@sizeOf(Context) != 0) {
+ arg = mmap_addr + context_start_offset;
+ const context_ptr = @alignCast(@alignOf(Context), @intToPtr(*Context, arg));
+ context_ptr.* = context;
+ }
+
+ if (std.Target.current.os.tag == .linux) {
+ const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES |
+ os.CLONE_SIGHAND | os.CLONE_THREAD | os.CLONE_SYSVSEM |
+ os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID |
+ os.CLONE_DETACHED | os.CLONE_SETTLS;
+ // This structure is only needed when targeting i386
+ var user_desc: if (std.Target.current.cpu.arch == .i386) os.linux.user_desc else void = undefined;
+
+ const tls_area = mmap_slice[tls_start_offset..];
+ const tp_value = os.linux.tls.prepareTLS(tls_area);
+
+ const newtls = blk: {
+ if (std.Target.current.cpu.arch == .i386) {
+ user_desc = os.linux.user_desc{
+ .entry_number = os.linux.tls.tls_image.gdt_entry_number,
+ .base_addr = tp_value,
+ .limit = 0xfffff,
+ .seg_32bit = 1,
+ .contents = 0, // Data
+ .read_exec_only = 0,
+ .limit_in_pages = 1,
+ .seg_not_present = 0,
+ .useable = 1,
+ };
+ break :blk @ptrToInt(&user_desc);
+ } else {
+ break :blk tp_value;
+ }
+ };
+
+ const rc = os.linux.clone(
+ MainFuncs.linuxThreadMain,
+ mmap_addr + stack_end_offset,
+ flags,
+ arg,
+ &thread_ptr.data.handle,
+ newtls,
+ &thread_ptr.data.handle,
+ );
+ switch (os.errno(rc)) {
+ 0 => return thread_ptr,
+ os.EAGAIN => return error.ThreadQuotaExceeded,
+ os.EINVAL => unreachable,
+ os.ENOMEM => return error.SystemResources,
+ os.ENOSPC => unreachable,
+ os.EPERM => unreachable,
+ os.EUSERS => unreachable,
+ else => |err| return os.unexpectedErrno(err),
+ }
+ } else {
+ @compileError("Unsupported OS");
+ }
+}
+
+pub const CpuCountError = error{
+ PermissionDenied,
+ SystemResources,
+ Unexpected,
+};
+
+pub fn cpuCount() CpuCountError!usize {
+ switch (std.Target.current.os.tag) {
+ .linux => {
+ const cpu_set = try os.sched_getaffinity(0);
+ return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast
+ },
+ .windows => {
+ return os.windows.peb().NumberOfProcessors;
+ },
+ .openbsd => {
+ var count: c_int = undefined;
+ var count_size: usize = @sizeOf(c_int);
+ const mib = [_]c_int{ os.CTL_HW, os.HW_NCPUONLINE };
+ os.sysctl(&mib, &count, &count_size, null, 0) catch |err| switch (err) {
+ error.NameTooLong, error.UnknownName => unreachable,
+ else => |e| return e,
+ };
+ return @intCast(usize, count);
+ },
+ .haiku => {
+ var count: u32 = undefined;
+ var system_info: os.system_info = undefined;
+ const rc = os.system.get_system_info(&system_info);
+ count = system_info.cpu_count;
+ return @intCast(usize, count);
+ },
+ else => {
+ var count: c_int = undefined;
+ var count_len: usize = @sizeOf(c_int);
+ const name = if (comptime std.Target.current.isDarwin()) "hw.logicalcpu" else "hw.ncpu";
+ os.sysctlbynameZ(name, &count, &count_len, null, 0) catch |err| switch (err) {
+ error.NameTooLong, error.UnknownName => unreachable,
+ else => |e| return e,
+ };
+ return @intCast(usize, count);
+ },
+ }
+}
+
+pub fn getCurrentThreadId() u64 {
+ switch (std.Target.current.os.tag) {
+ .linux => {
+ // Use the syscall directly as musl doesn't provide a wrapper.
+ return @bitCast(u32, os.linux.gettid());
+ },
+ .windows => {
+ return os.windows.kernel32.GetCurrentThreadId();
+ },
+ .macos, .ios, .watchos, .tvos => {
+ var thread_id: u64 = undefined;
+ // Pass thread=null to get the current thread ID.
+ assert(c.pthread_threadid_np(null, &thread_id) == 0);
+ return thread_id;
+ },
+ .netbsd => {
+ return @bitCast(u32, c._lwp_self());
+ },
+ .freebsd => {
+ return @bitCast(u32, c.pthread_getthreadid_np());
+ },
+ .openbsd => {
+ return @bitCast(u32, c.getthrid());
+ },
+ .haiku => {
+ return @bitCast(u32, c.find_thread(null));
+ },
+ else => {
+ @compileError("getCurrentThreadId not implemented for this platform");
+ },
+ }
+}
+
+test {
+ if (!builtin.single_threaded) {
+ std.testing.refAllDecls(@This());
+ }
+}