diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2021-01-14 20:41:37 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2021-01-14 20:41:37 -0700 |
| commit | a9667b5a859a589056f23df2b74b91fede0bbbfa (patch) | |
| tree | 0efb150c8b3357b61f2dc11b0018a1038fe6d354 /lib/std | |
| parent | 2b0e3ee228e01473cf880f719db9bde5b8f34d25 (diff) | |
| download | zig-a9667b5a859a589056f23df2b74b91fede0bbbfa.tar.gz zig-a9667b5a859a589056f23df2b74b91fede0bbbfa.zip | |
organize std lib concurrency primitives and add RwLock
* move concurrency primitives that always operate on kernel threads to
the std.Thread namespace
* remove std.SpinLock. Nobody should use this in a non-freestanding
environment; the other primitives are always preferable. In
freestanding, it will be necessary to put custom spin logic in there,
so there are no use cases for a std lib version.
* move some std lib files to the top level fields convention
* add std.Thread.spinLoopHint
* add std.Thread.Condition
* add std.Thread.Semaphore
* new implementation of std.Thread.Mutex for Windows and non-pthreads Linux
* add std.Thread.RwLock
Implementations provided by @kprotty
Diffstat (limited to 'lib/std')
34 files changed, 1746 insertions, 1261 deletions
diff --git a/lib/std/Progress.zig b/lib/std/Progress.zig index ca9fb8ea1f..b75ad106a4 100644 --- a/lib/std/Progress.zig +++ b/lib/std/Progress.zig @@ -50,7 +50,7 @@ done: bool = true, /// Protects the `refresh` function, as well as `node.recently_updated_child`. /// Without this, callsites would call `Node.end` and then free `Node` memory /// while it was still being accessed by the `refresh` function. -update_lock: std.Mutex = .{}, +update_lock: std.Thread.Mutex = .{}, /// Keeps track of how many columns in the terminal have been output, so that /// we can move the cursor back later. diff --git a/lib/std/SpinLock.zig b/lib/std/SpinLock.zig deleted file mode 100644 index a16cfa930b..0000000000 --- a/lib/std/SpinLock.zig +++ /dev/null @@ -1,86 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -//! A mutually exclusive lock that grinds the CPU rather than interacting with -//! the operating system. It does however yield to the OS scheduler while -//! spinning, when targeting an OS that supports it. -//! This struct can be initialized directly and statically initialized. The -//! default state is unlocked. - -state: State = State.Unlocked, - -const std = @import("std.zig"); -const builtin = @import("builtin"); -const SpinLock = @This(); - -const State = enum(u8) { - Unlocked, - Locked, -}; - -pub const Held = struct { - spinlock: *SpinLock, - - pub fn release(self: Held) void { - @atomicStore(State, &self.spinlock.state, .Unlocked, .Release); - } -}; - -pub fn tryAcquire(self: *SpinLock) ?Held { - return switch (@atomicRmw(State, &self.state, .Xchg, .Locked, .Acquire)) { - .Unlocked => Held{ .spinlock = self }, - .Locked => null, - }; -} - -pub fn acquire(self: *SpinLock) Held { - while (true) { - return self.tryAcquire() orelse { - yield(); - continue; - }; - } -} - -pub fn yield() void { - // On native windows, SwitchToThread is too expensive, - // and yielding for 380-410 iterations was found to be - // a nice sweet spot. Posix systems on the other hand, - // especially linux, perform better by yielding the thread. - switch (builtin.os.tag) { - .windows => loopHint(400), - else => std.os.sched_yield() catch loopHint(1), - } -} - -/// Hint to the cpu that execution is spinning -/// for the given amount of iterations. -pub fn loopHint(iterations: usize) void { - var i = iterations; - while (i != 0) : (i -= 1) { - switch (builtin.arch) { - // these instructions use a memory clobber as they - // flush the pipeline of any speculated reads/writes. - .i386, .x86_64 => asm volatile ("pause" - : - : - : "memory" - ), - .arm, .aarch64 => asm volatile ("yield" - : - : - : "memory" - ), - else => std.os.sched_yield() catch {}, - } - } -} - -test "basic usage" { - var lock: SpinLock = .{}; - - const held = lock.acquire(); - defer held.release(); -} diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig new file mode 100644 index 0000000000..f878f43539 --- /dev/null +++ b/lib/std/Thread.zig @@ -0,0 +1,558 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! This struct represents a kernel thread, and acts as a namespace for concurrency +//! primitives that operate on kernel threads. For concurrency primitives that support +//! both evented I/O and async I/O, see the respective names in the top level std namespace. + +data: Data, + +pub const AutoResetEvent = @import("Thread/AutoResetEvent.zig"); +pub const ResetEvent = @import("Thread/ResetEvent.zig"); +pub const StaticResetEvent = @import("Thread/StaticResetEvent.zig"); +pub const Mutex = @import("Thread/Mutex.zig"); +pub const Semaphore = @import("Thread/Semaphore.zig"); +pub const Condition = @import("Thread/Condition.zig"); + +pub const use_pthreads = std.Target.current.os.tag != .windows and builtin.link_libc; + +const Thread = @This(); +const std = @import("std.zig"); +const builtin = std.builtin; +const os = std.os; +const mem = std.mem; +const windows = std.os.windows; +const c = std.c; +const assert = std.debug.assert; + +const bad_startfn_ret = "expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"; + +/// Represents a kernel thread handle. +/// May be an integer or a pointer depending on the platform. +/// On Linux and POSIX, this is the same as Id. +pub const Handle = if (use_pthreads) + c.pthread_t +else switch (std.Target.current.os.tag) { + .linux => i32, + .windows => windows.HANDLE, + else => void, +}; + +/// Represents a unique ID per thread. +/// May be an integer or pointer depending on the platform. +/// On Linux and POSIX, this is the same as Handle. +pub const Id = switch (std.Target.current.os.tag) { + .windows => windows.DWORD, + else => Handle, +}; + +pub const Data = if (use_pthreads) + struct { + handle: Thread.Handle, + memory: []u8, + } +else switch (std.Target.current.os.tag) { + .linux => struct { + handle: Thread.Handle, + memory: []align(mem.page_size) u8, + }, + .windows => struct { + handle: Thread.Handle, + alloc_start: *c_void, + heap_handle: windows.HANDLE, + }, + else => struct {}, +}; + +/// Signals the processor that it is inside a busy-wait spin-loop ("spin lock"). +pub fn spinLoopHint() void { + switch (std.Target.current.cpu.arch) { + .i386, .x86_64 => asm volatile ("pause" + : + : + : "memory" + ), + .arm, .aarch64 => asm volatile ("yield" + : + : + : "memory" + ), + else => {}, + } +} + +/// Returns the ID of the calling thread. +/// Makes a syscall every time the function is called. +/// On Linux and POSIX, this Id is the same as a Handle. +pub fn getCurrentId() Id { + if (use_pthreads) { + return c.pthread_self(); + } else + return switch (std.Target.current.os.tag) { + .linux => os.linux.gettid(), + .windows => windows.kernel32.GetCurrentThreadId(), + else => @compileError("Unsupported OS"), + }; +} + +/// Returns the handle of this thread. +/// On Linux and POSIX, this is the same as Id. +/// On Linux, it is possible that the thread spawned with `spawn` +/// finishes executing entirely before the clone syscall completes. In this +/// case, this function will return 0 rather than the no-longer-existing thread's +/// pid. +pub fn handle(self: Thread) Handle { + return self.data.handle; +} + +pub fn wait(self: *Thread) void { + if (use_pthreads) { + const err = c.pthread_join(self.data.handle, null); + switch (err) { + 0 => {}, + os.EINVAL => unreachable, + os.ESRCH => unreachable, + os.EDEADLK => unreachable, + else => unreachable, + } + std.heap.c_allocator.free(self.data.memory); + std.heap.c_allocator.destroy(self); + } else switch (std.Target.current.os.tag) { + .linux => { + while (true) { + const pid_value = @atomicLoad(i32, &self.data.handle, .SeqCst); + if (pid_value == 0) break; + const rc = os.linux.futex_wait(&self.data.handle, os.linux.FUTEX_WAIT, pid_value, null); + switch (os.linux.getErrno(rc)) { + 0 => continue, + os.EINTR => continue, + os.EAGAIN => continue, + else => unreachable, + } + } + os.munmap(self.data.memory); + }, + .windows => { + windows.WaitForSingleObjectEx(self.data.handle, windows.INFINITE, false) catch unreachable; + windows.CloseHandle(self.data.handle); + windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start); + }, + else => @compileError("Unsupported OS"), + } +} + +pub const SpawnError = error{ + /// A system-imposed limit on the number of threads was encountered. + /// There are a number of limits that may trigger this error: + /// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)), + /// which limits the number of processes and threads for a real + /// user ID, was reached; + /// * the kernel's system-wide limit on the number of processes and + /// threads, /proc/sys/kernel/threads-max, was reached (see + /// proc(5)); + /// * the maximum number of PIDs, /proc/sys/kernel/pid_max, was + /// reached (see proc(5)); or + /// * the PID limit (pids.max) imposed by the cgroup "process num‐ + /// ber" (PIDs) controller was reached. + ThreadQuotaExceeded, + + /// The kernel cannot allocate sufficient memory to allocate a task structure + /// for the child, or to copy those parts of the caller's context that need to + /// be copied. + SystemResources, + + /// Not enough userland memory to spawn the thread. + OutOfMemory, + + /// `mlockall` is enabled, and the memory needed to spawn the thread + /// would exceed the limit. + LockedMemoryLimitExceeded, + + Unexpected, +}; + +/// caller must call wait on the returned thread +/// fn startFn(@TypeOf(context)) T +/// where T is u8, noreturn, void, or !void +/// caller must call wait on the returned thread +pub fn spawn(context: anytype, comptime startFn: anytype) SpawnError!*Thread { + if (builtin.single_threaded) @compileError("cannot spawn thread when building in single-threaded mode"); + // TODO compile-time call graph analysis to determine stack upper bound + // https://github.com/ziglang/zig/issues/157 + const default_stack_size = 16 * 1024 * 1024; + + const Context = @TypeOf(context); + comptime assert(@typeInfo(@TypeOf(startFn)).Fn.args[0].arg_type.? == Context); + + if (std.Target.current.os.tag == .windows) { + const WinThread = struct { + const OuterContext = struct { + thread: Thread, + inner: Context, + }; + fn threadMain(raw_arg: windows.LPVOID) callconv(.C) windows.DWORD { + const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*; + + switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { + .NoReturn => { + startFn(arg); + }, + .Void => { + startFn(arg); + return 0; + }, + .Int => |info| { + if (info.bits != 8) { + @compileError(bad_startfn_ret); + } + return startFn(arg); + }, + .ErrorUnion => |info| { + if (info.payload != void) { + @compileError(bad_startfn_ret); + } + startFn(arg) catch |err| { + std.debug.warn("error: {s}\n", .{@errorName(err)}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + }; + return 0; + }, + else => @compileError(bad_startfn_ret), + } + } + }; + + const heap_handle = windows.kernel32.GetProcessHeap() orelse return error.OutOfMemory; + const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext); + const bytes_ptr = windows.kernel32.HeapAlloc(heap_handle, 0, byte_count) orelse return error.OutOfMemory; + errdefer assert(windows.kernel32.HeapFree(heap_handle, 0, bytes_ptr) != 0); + const bytes = @ptrCast([*]u8, bytes_ptr)[0..byte_count]; + const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable; + outer_context.* = WinThread.OuterContext{ + .thread = Thread{ + .data = Thread.Data{ + .heap_handle = heap_handle, + .alloc_start = bytes_ptr, + .handle = undefined, + }, + }, + .inner = context, + }; + + const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(*c_void, &outer_context.inner); + outer_context.thread.data.handle = windows.kernel32.CreateThread(null, default_stack_size, WinThread.threadMain, parameter, 0, null) orelse { + switch (windows.kernel32.GetLastError()) { + else => |err| return windows.unexpectedError(err), + } + }; + return &outer_context.thread; + } + + const MainFuncs = struct { + fn linuxThreadMain(ctx_addr: usize) callconv(.C) u8 { + const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*; + + switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { + .NoReturn => { + startFn(arg); + }, + .Void => { + startFn(arg); + return 0; + }, + .Int => |info| { + if (info.bits != 8) { + @compileError(bad_startfn_ret); + } + return startFn(arg); + }, + .ErrorUnion => |info| { + if (info.payload != void) { + @compileError(bad_startfn_ret); + } + startFn(arg) catch |err| { + std.debug.warn("error: {s}\n", .{@errorName(err)}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + }; + return 0; + }, + else => @compileError(bad_startfn_ret), + } + } + fn posixThreadMain(ctx: ?*c_void) callconv(.C) ?*c_void { + const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), ctx)).*; + + switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { + .NoReturn => { + startFn(arg); + }, + .Void => { + startFn(arg); + return null; + }, + .Int => |info| { + if (info.bits != 8) { + @compileError(bad_startfn_ret); + } + // pthreads don't support exit status, ignore value + _ = startFn(arg); + return null; + }, + .ErrorUnion => |info| { + if (info.payload != void) { + @compileError(bad_startfn_ret); + } + startFn(arg) catch |err| { + std.debug.warn("error: {s}\n", .{@errorName(err)}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + }; + return null; + }, + else => @compileError(bad_startfn_ret), + } + } + }; + + if (Thread.use_pthreads) { + var attr: c.pthread_attr_t = undefined; + if (c.pthread_attr_init(&attr) != 0) return error.SystemResources; + defer assert(c.pthread_attr_destroy(&attr) == 0); + + const thread_obj = try std.heap.c_allocator.create(Thread); + errdefer std.heap.c_allocator.destroy(thread_obj); + if (@sizeOf(Context) > 0) { + thread_obj.data.memory = try std.heap.c_allocator.allocAdvanced( + u8, + @alignOf(Context), + @sizeOf(Context), + .at_least, + ); + errdefer std.heap.c_allocator.free(thread_obj.data.memory); + mem.copy(u8, thread_obj.data.memory, mem.asBytes(&context)); + } else { + thread_obj.data.memory = @as([*]u8, undefined)[0..0]; + } + + // Use the same set of parameters used by the libc-less impl. + assert(c.pthread_attr_setstacksize(&attr, default_stack_size) == 0); + assert(c.pthread_attr_setguardsize(&attr, mem.page_size) == 0); + + const err = c.pthread_create( + &thread_obj.data.handle, + &attr, + MainFuncs.posixThreadMain, + thread_obj.data.memory.ptr, + ); + switch (err) { + 0 => return thread_obj, + os.EAGAIN => return error.SystemResources, + os.EPERM => unreachable, + os.EINVAL => unreachable, + else => return os.unexpectedErrno(@intCast(usize, err)), + } + + return thread_obj; + } + + var guard_end_offset: usize = undefined; + var stack_end_offset: usize = undefined; + var thread_start_offset: usize = undefined; + var context_start_offset: usize = undefined; + var tls_start_offset: usize = undefined; + const mmap_len = blk: { + var l: usize = mem.page_size; + // Allocate a guard page right after the end of the stack region + guard_end_offset = l; + // The stack itself, which grows downwards. + l = mem.alignForward(l + default_stack_size, mem.page_size); + stack_end_offset = l; + // Above the stack, so that it can be in the same mmap call, put the Thread object. + l = mem.alignForward(l, @alignOf(Thread)); + thread_start_offset = l; + l += @sizeOf(Thread); + // Next, the Context object. + if (@sizeOf(Context) != 0) { + l = mem.alignForward(l, @alignOf(Context)); + context_start_offset = l; + l += @sizeOf(Context); + } + // Finally, the Thread Local Storage, if any. + l = mem.alignForward(l, os.linux.tls.tls_image.alloc_align); + tls_start_offset = l; + l += os.linux.tls.tls_image.alloc_size; + // Round the size to the page size. + break :blk mem.alignForward(l, mem.page_size); + }; + + const mmap_slice = mem: { + // Map the whole stack with no rw permissions to avoid + // committing the whole region right away + const mmap_slice = os.mmap( + null, + mmap_len, + os.PROT_NONE, + os.MAP_PRIVATE | os.MAP_ANONYMOUS, + -1, + 0, + ) catch |err| switch (err) { + error.MemoryMappingNotSupported => unreachable, + error.AccessDenied => unreachable, + error.PermissionDenied => unreachable, + else => |e| return e, + }; + errdefer os.munmap(mmap_slice); + + // Map everything but the guard page as rw + os.mprotect( + mmap_slice[guard_end_offset..], + os.PROT_READ | os.PROT_WRITE, + ) catch |err| switch (err) { + error.AccessDenied => unreachable, + else => |e| return e, + }; + + break :mem mmap_slice; + }; + + const mmap_addr = @ptrToInt(mmap_slice.ptr); + + const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(*Thread, mmap_addr + thread_start_offset)); + thread_ptr.data.memory = mmap_slice; + + var arg: usize = undefined; + if (@sizeOf(Context) != 0) { + arg = mmap_addr + context_start_offset; + const context_ptr = @alignCast(@alignOf(Context), @intToPtr(*Context, arg)); + context_ptr.* = context; + } + + if (std.Target.current.os.tag == .linux) { + const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | + os.CLONE_SIGHAND | os.CLONE_THREAD | os.CLONE_SYSVSEM | + os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID | + os.CLONE_DETACHED | os.CLONE_SETTLS; + // This structure is only needed when targeting i386 + var user_desc: if (std.Target.current.cpu.arch == .i386) os.linux.user_desc else void = undefined; + + const tls_area = mmap_slice[tls_start_offset..]; + const tp_value = os.linux.tls.prepareTLS(tls_area); + + const newtls = blk: { + if (std.Target.current.cpu.arch == .i386) { + user_desc = os.linux.user_desc{ + .entry_number = os.linux.tls.tls_image.gdt_entry_number, + .base_addr = tp_value, + .limit = 0xfffff, + .seg_32bit = 1, + .contents = 0, // Data + .read_exec_only = 0, + .limit_in_pages = 1, + .seg_not_present = 0, + .useable = 1, + }; + break :blk @ptrToInt(&user_desc); + } else { + break :blk tp_value; + } + }; + + const rc = os.linux.clone( + MainFuncs.linuxThreadMain, + mmap_addr + stack_end_offset, + flags, + arg, + &thread_ptr.data.handle, + newtls, + &thread_ptr.data.handle, + ); + switch (os.errno(rc)) { + 0 => return thread_ptr, + os.EAGAIN => return error.ThreadQuotaExceeded, + os.EINVAL => unreachable, + os.ENOMEM => return error.SystemResources, + os.ENOSPC => unreachable, + os.EPERM => unreachable, + os.EUSERS => unreachable, + else => |err| return os.unexpectedErrno(err), + } + } else { + @compileError("Unsupported OS"); + } +} + +pub const CpuCountError = error{ + PermissionDenied, + SystemResources, + Unexpected, +}; + +pub fn cpuCount() CpuCountError!usize { + if (std.Target.current.os.tag == .linux) { + const cpu_set = try os.sched_getaffinity(0); + return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast + } + if (std.Target.current.os.tag == .windows) { + return os.windows.peb().NumberOfProcessors; + } + if (std.Target.current.os.tag == .openbsd) { + var count: c_int = undefined; + var count_size: usize = @sizeOf(c_int); + const mib = [_]c_int{ os.CTL_HW, os.HW_NCPUONLINE }; + os.sysctl(&mib, &count, &count_size, null, 0) catch |err| switch (err) { + error.NameTooLong, error.UnknownName => unreachable, + else => |e| return e, + }; + return @intCast(usize, count); + } + var count: c_int = undefined; + var count_len: usize = @sizeOf(c_int); + const name = if (comptime std.Target.current.isDarwin()) "hw.logicalcpu" else "hw.ncpu"; + os.sysctlbynameZ(name, &count, &count_len, null, 0) catch |err| switch (err) { + error.NameTooLong, error.UnknownName => unreachable, + else => |e| return e, + }; + return @intCast(usize, count); +} + +pub fn getCurrentThreadId() u64 { + switch (std.Target.current.os.tag) { + .linux => { + // Use the syscall directly as musl doesn't provide a wrapper. + return @bitCast(u32, os.linux.gettid()); + }, + .windows => { + return os.windows.kernel32.GetCurrentThreadId(); + }, + .macos, .ios, .watchos, .tvos => { + var thread_id: u64 = undefined; + // Pass thread=null to get the current thread ID. + assert(c.pthread_threadid_np(null, &thread_id) == 0); + return thread_id; + }, + .netbsd => { + return @bitCast(u32, c._lwp_self()); + }, + .freebsd => { + return @bitCast(u32, c.pthread_getthreadid_np()); + }, + .openbsd => { + return @bitCast(u32, c.getthrid()); + }, + else => { + @compileError("getCurrentThreadId not implemented for this platform"); + }, + } +} + +test "" { + std.testing.refAllDecls(@This()); +} diff --git a/lib/std/Thread/AutoResetEvent.zig b/lib/std/Thread/AutoResetEvent.zig new file mode 100644 index 0000000000..8b8b5658bf --- /dev/null +++ b/lib/std/Thread/AutoResetEvent.zig @@ -0,0 +1,228 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`. +//! Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like). +//! +//! AutoResetEvent has 3 possible states: +//! - UNSET: the AutoResetEvent is currently unset +//! - SET: the AutoResetEvent was notified before a wait() was called +//! - <StaticResetEvent pointer>: there is an active waiter waiting for a notification. +//! +//! When attempting to wait: +//! if the event is unset, it registers a ResetEvent pointer to be notified when the event is set +//! if the event is already set, then it consumes the notification and resets the event. +//! +//! When attempting to notify: +//! if the event is unset, then we set the event +//! if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent +//! +//! This ensures that the event is automatically reset after a wait() has been issued +//! and avoids the race condition when using StaticResetEvent in the following scenario: +//! thread 1 | thread 2 +//! StaticResetEvent.wait() | +//! | StaticResetEvent.set() +//! | StaticResetEvent.set() +//! StaticResetEvent.reset() | +//! StaticResetEvent.wait() | (missed the second .set() notification above) + +state: usize = UNSET, + +const std = @import("../std.zig"); +const builtin = @import("builtin"); +const testing = std.testing; +const assert = std.debug.assert; +const StaticResetEvent = std.Thread.StaticResetEvent; +const AutoResetEvent = @This(); + +const UNSET = 0; +const SET = 1; + +/// the minimum alignment for the `*StaticResetEvent` created by wait*() +const event_align = std.math.max(@alignOf(StaticResetEvent), 2); + +pub fn wait(self: *AutoResetEvent) void { + self.waitFor(null) catch unreachable; +} + +pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void { + return self.waitFor(timeout); +} + +fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { + // lazily initialized StaticResetEvent + var reset_event: StaticResetEvent align(event_align) = undefined; + var has_reset_event = false; + + var state = @atomicLoad(usize, &self.state, .SeqCst); + while (true) { + // consume a notification if there is any + if (state == SET) { + @atomicStore(usize, &self.state, UNSET, .SeqCst); + return; + } + + // check if theres currently a pending ResetEvent pointer already registered + if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent + } + + // lazily initialize the ResetEvent if it hasn't been already + if (!has_reset_event) { + has_reset_event = true; + reset_event = .{}; + } + + // Since the AutoResetEvent currently isnt set, + // try to register our ResetEvent on it to wait + // for a set() call from another thread. + if (@cmpxchgWeak( + usize, + &self.state, + UNSET, + @ptrToInt(&reset_event), + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + // if no timeout was specified, then just wait forever + const timeout_ns = timeout orelse { + reset_event.wait(); + return; + }; + + // wait with a timeout and return if signalled via set() + switch (reset_event.timedWait(timeout_ns)) { + .event_set => return, + .timed_out => {}, + } + + // If we timed out, we need to transition the AutoResetEvent back to UNSET. + // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. + state = @cmpxchgStrong( + usize, + &self.state, + @ptrToInt(&reset_event), + UNSET, + .SeqCst, + .SeqCst, + ) orelse return error.TimedOut; + + // We didn't manage to unregister ourselves from the state. + if (state == SET) { + unreachable; // AutoResetEvent notified without waking up the waiting thread + } else if (state != UNSET) { + unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out + } + + // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up. + // We need to wait for it to wake up our ResetEvent before we can return and invalidate it. + // We don't return error.TimedOut here as it technically notified us while we were "timing out". + reset_event.wait(); + return; + } +} + +pub fn set(self: *AutoResetEvent) void { + var state = @atomicLoad(usize, &self.state, .SeqCst); + while (true) { + // If the AutoResetEvent is already set, there is nothing else left to do + if (state == SET) { + return; + } + + // If the AutoResetEvent isn't set, + // then try to leave a notification for the wait() thread that we set() it. + if (state == UNSET) { + state = @cmpxchgWeak( + usize, + &self.state, + UNSET, + SET, + .SeqCst, + .SeqCst, + ) orelse return; + continue; + } + + // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting. + // Try to acquire ownership of it so that we can wake it up. + // This also resets the AutoResetEvent so that there is no race condition as defined above. + if (@cmpxchgWeak( + usize, + &self.state, + state, + UNSET, + .SeqCst, + .SeqCst, + )) |new_state| { + state = new_state; + continue; + } + + const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state); + reset_event.set(); + return; + } +} + +test "basic usage" { + // test local code paths + { + var event = AutoResetEvent{}; + testing.expectError(error.TimedOut, event.timedWait(1)); + event.set(); + event.wait(); + } + + // test cross-thread signaling + if (builtin.single_threaded) + return; + + const Context = struct { + value: u128 = 0, + in: AutoResetEvent = AutoResetEvent{}, + out: AutoResetEvent = AutoResetEvent{}, + + const Self = @This(); + + fn sender(self: *Self) void { + testing.expect(self.value == 0); + self.value = 1; + self.out.set(); + + self.in.wait(); + testing.expect(self.value == 2); + self.value = 3; + self.out.set(); + + self.in.wait(); + testing.expect(self.value == 4); + } + + fn receiver(self: *Self) void { + self.out.wait(); + testing.expect(self.value == 1); + self.value = 2; + self.in.set(); + + self.out.wait(); + testing.expect(self.value == 3); + self.value = 4; + self.in.set(); + } + }; + + var context = Context{}; + const send_thread = try std.Thread.spawn(&context, Context.sender); + const recv_thread = try std.Thread.spawn(&context, Context.receiver); + + send_thread.wait(); + recv_thread.wait(); +} diff --git a/lib/std/Thread/Condition.zig b/lib/std/Thread/Condition.zig new file mode 100644 index 0000000000..2379d264d1 --- /dev/null +++ b/lib/std/Thread/Condition.zig @@ -0,0 +1,182 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A condition provides a way for a kernel thread to block until it is signaled +//! to wake up. Spurious wakeups are possible. +//! This API supports static initialization and does not require deinitialization. + +impl: Impl, + +const std = @import("../std.zig"); +const Condition = @This(); +const windows = std.os.windows; +const linux = std.os.linux; +const Mutex = std.Thread.Mutex; +const assert = std.debug.assert; + +const Impl = if (std.builtin.single_threaded) + SingleThreadedCondition +else if (std.Target.current.os.tag == .windows) + WindowsCondition +else if (std.Thread.use_pthreads) + PthreadCondition +else + AtomicCondition; + +pub const SingleThreadedCondition = struct { + pub fn wait(cond: *SingleThreadedCondition, mutex: *Mutex) void { + unreachable; // deadlock detected + } + + pub fn signal(cond: *SingleThreadedCondition) void {} + + pub fn broadcast(cond: *SingleThreadedCondition) void {} +}; + +pub const WindowsCondition = struct { + cond: windows.CONDITION_VARIABLE = windows.CONDITION_VARIABLE_INIT, + + pub fn wait(cond: *WindowsCondition, mutex: *Mutex) void { + const rc = windows.SleepConditionVariableSRW( + &cond.cond, + &mutex.srwlock, + windows.INFINITE, + @as(windows.ULONG, 0), + ); + assert(rc != windows.FALSE); + } + + pub fn signal(cond: *WindowsCondition) void { + windows.WakeConditionVariable(&cond.cond); + } + + pub fn broadcast(cond: *WindowsCondition) void { + windows.WakeAllConditionVariable(&cond.cond); + } +}; + +pub const PthreadCondition = struct { + cond: std.c.pthread_cond_t = .{}, + + pub fn wait(cond: *PthreadCondition, mutex: *Mutex) void { + const rc = std.c.pthread_cond_wait(&cond.cond, &mutex.mutex); + assert(rc == 0); + } + + pub fn signal(cond: *PthreadCondition) void { + const rc = std.c.pthread_cond_signal(&cond.cond); + assert(rc == 0); + } + + pub fn broadcast(cond: *PthreadCondition) void { + const rc = std.c.pthread_cond_broadcast(&cond.cond); + assert(rc == 0); + } +}; + +pub const AtomicCondition = struct { + pending: bool = false, + queue_mutex: Mutex = .{}, + queue_list: QueueList = .{}, + + pub const QueueList = std.SinglyLinkedList(QueueItem); + + pub const QueueItem = struct { + futex: i32 = 0, + + fn wait(cond: *@This()) void { + while (@atomicLoad(i32, &cond.futex, .Acquire) == 0) { + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wait( + &cond.futex, + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT, + 0, + null, + ))) { + 0 => {}, + std.os.EINTR => {}, + std.os.EAGAIN => {}, + else => unreachable, + } + }, + else => spinLoopHint(), + } + } + } + + fn notify(cond: *@This()) void { + @atomicStore(i32, &cond.futex, 1, .Release); + + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wake( + &cond.futex, + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE, + 1, + ))) { + 0 => {}, + std.os.EFAULT => {}, + else => unreachable, + } + }, + else => {}, + } + } + }; + + pub fn wait(cond: *AtomicCondition, mutex: *Mutex) void { + var waiter = QueueList.Node{ .data = .{} }; + + { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + cond.queue_list.prepend(&waiter); + @atomicStore(bool, &cond.pending, true, .SeqCst); + } + + mutex.unlock(); + waiter.data.wait(); + mutex.lock(); + } + + pub fn signal(cond: *AtomicCondition) void { + if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) + return; + + const maybe_waiter = blk: { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + const maybe_waiter = cond.queue_list.popFirst(); + @atomicStore(bool, &cond.pending, cond.queue_list.first != null, .SeqCst); + break :blk maybe_waiter; + }; + + if (maybe_waiter) |waiter| + waiter.data.notify(); + } + + pub fn broadcast(cond: *AtomicCondition) void { + if (@atomicLoad(bool, &cond.pending, .SeqCst) == false) + return; + + @atomicStore(bool, &cond.pending, false, .SeqCst); + + var waiters = blk: { + const held = cond.queue_mutex.acquire(); + defer held.release(); + + const waiters = cond.queue_list; + cond.queue_list = .{}; + break :blk waiters; + }; + + while (waiters.popFirst()) |waiter| + waiter.data.notify(); + } +}; diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig new file mode 100644 index 0000000000..128eb0be80 --- /dev/null +++ b/lib/std/Thread/Mutex.zig @@ -0,0 +1,303 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! Lock may be held only once. If the same thread tries to acquire +//! the same mutex twice, it deadlocks. This type supports static +//! initialization and is at most `@sizeOf(usize)` in size. When an +//! application is built in single threaded release mode, all the +//! functions are no-ops. In single threaded debug mode, there is +//! deadlock detection. +//! +//! Example usage: +//! var m = Mutex{}; +//! +//! const lock = m.acquire(); +//! defer lock.release(); +//! ... critical code +//! +//! Non-blocking: +//! if (m.tryAcquire) |lock| { +//! defer lock.release(); +//! // ... critical section +//! } else { +//! // ... lock not acquired +//! } + +impl: Impl = .{}, + +const Mutex = @This(); +const std = @import("../std.zig"); +const builtin = std.builtin; +const os = std.os; +const assert = std.debug.assert; +const windows = os.windows; +const linux = os.linux; +const testing = std.testing; +const StaticResetEvent = std.thread.StaticResetEvent; + +pub const Held = struct { + impl: *Impl, + + pub fn release(held: Held) void { + held.impl.release(); + } +}; + +/// Try to acquire the mutex without blocking. Returns null if +/// the mutex is unavailable. Otherwise returns Held. Call +/// release on Held. +pub fn tryAcquire(m: *Mutex) ?Held { + if (m.impl.tryAcquire()) { + return Held{ .impl = &m.impl }; + } else { + return null; + } +} + +/// Acquire the mutex. Deadlocks if the mutex is already +/// held by the calling thread. +pub fn acquire(m: *Mutex) Held { + m.impl.acquire(); + return .{ .impl = &m.impl }; +} + +const Impl = if (builtin.single_threaded) + Dummy +else if (builtin.os.tag == .windows) + WindowsMutex +else if (std.Thread.use_pthreads) + PthreadMutex +else + AtomicMutex; + +pub const AtomicMutex = struct { + state: State = .unlocked, + + const State = enum(i32) { + unlocked, + locked, + waiting, + }; + + pub fn tryAcquire(self: *AtomicMutex) bool { + return @cmpxchgStrong( + State, + &self.state, + .unlocked, + .locked, + .Acquire, + .Monotonic, + ) == null; + } + + pub fn acquire(self: *AtomicMutex) void { + switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) { + .unlocked => {}, + else => |s| self.lockSlow(s), + } + } + + fn lockSlow(self: *AtomicMutex, current_state: State) void { + @setCold(true); + var new_state = current_state; + + var spin: u8 = 0; + while (spin < 100) : (spin += 1) { + const state = @cmpxchgWeak( + State, + &self.state, + .unlocked, + new_state, + .Acquire, + .Monotonic, + ) orelse return; + + switch (state) { + .unlocked => {}, + .locked => {}, + .waiting => break, + } + + var iter = std.math.min(32, spin + 1); + while (iter > 0) : (iter -= 1) + std.Thread.spinLoopHint(); + } + + new_state = .waiting; + while (true) { + switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) { + .unlocked => return, + else => {}, + } + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wait( + @ptrCast(*const i32, &self.state), + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAIT, + @enumToInt(new_state), + null, + ))) { + 0 => {}, + std.os.EINTR => {}, + std.os.EAGAIN => {}, + else => unreachable, + } + }, + else => std.Thread.spinLoopHint(), + } + } + } + + pub fn release(self: *AtomicMutex) void { + switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) { + .unlocked => unreachable, + .locked => {}, + .waiting => self.unlockSlow(), + } + } + + fn unlockSlow(self: *AtomicMutex) void { + @setCold(true); + + switch (std.Target.current.os.tag) { + .linux => { + switch (linux.getErrno(linux.futex_wake( + @ptrCast(*const i32, &self.state), + linux.FUTEX_PRIVATE_FLAG | linux.FUTEX_WAKE, + 1, + ))) { + 0 => {}, + std.os.EFAULT => {}, + else => unreachable, + } + }, + else => {}, + } + } +}; + +pub const PthreadMutex = struct { + pthread_mutex: std.c.pthread_mutex_t = .{}, + + /// Try to acquire the mutex without blocking. Returns null if + /// the mutex is unavailable. Otherwise returns Held. Call + /// release on Held. + pub fn tryAcquire(self: *PthreadMutex) bool { + return std.c.pthread_mutex_trylock(&self.pthread_mutex) == 0; + } + + /// Acquire the mutex. Will deadlock if the mutex is already + /// held by the calling thread. + pub fn acquire(self: *PthreadMutex) void { + switch (std.c.pthread_mutex_lock(&self.pthread_mutex)) { + 0 => return, + std.c.EINVAL => unreachable, + std.c.EBUSY => unreachable, + std.c.EAGAIN => unreachable, + std.c.EDEADLK => unreachable, + std.c.EPERM => unreachable, + else => unreachable, + } + } + + pub fn release(self: *PthreadMutex) void { + switch (std.c.pthread_mutex_unlock(&self.pthread_mutex)) { + 0 => return, + std.c.EINVAL => unreachable, + std.c.EAGAIN => unreachable, + std.c.EPERM => unreachable, + else => unreachable, + } + } +}; + +/// This has the sematics as `Mutex`, however it does not actually do any +/// synchronization. Operations are safety-checked no-ops. +pub const Dummy = struct { + lock: @TypeOf(lock_init) = lock_init, + + const lock_init = if (std.debug.runtime_safety) false else {}; + + /// Try to acquire the mutex without blocking. Returns null if + /// the mutex is unavailable. Otherwise returns Held. Call + /// release on Held. + pub fn tryAcquire(self: *Dummy) bool { + if (std.debug.runtime_safety) { + if (self.lock) return false; + self.lock = true; + } + return true; + } + + /// Acquire the mutex. Will deadlock if the mutex is already + /// held by the calling thread. + pub fn acquire(self: *Dummy) void { + return self.tryAcquire() orelse @panic("deadlock detected"); + } + + pub fn release(self: *Dummy) void { + if (std.debug.runtime_safety) { + self.mutex.lock = false; + } + } +}; + +const WindowsMutex = struct { + srwlock: windows.SRWLOCK = windows.SRWLOCK_INIT, + + pub fn tryAcquire(self: *WindowsMutex) bool { + return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE; + } + + pub fn acquire(self: *WindowsMutex) void { + AcquireSRWLockExclusive(&self.srwlock); + } + + pub fn release(self: *WindowsMutex) void { + ReleaseSRWLockExclusive(&self.srwlock); + } +}; + +const TestContext = struct { + mutex: *Mutex, + data: i128, + + const incr_count = 10000; +}; + +test "basic usage" { + var mutex = Mutex{}; + + var context = TestContext{ + .mutex = &mutex, + .data = 0, + }; + + if (builtin.single_threaded) { + worker(&context); + testing.expect(context.data == TestContext.incr_count); + } else { + const thread_count = 10; + var threads: [thread_count]*std.Thread = undefined; + for (threads) |*t| { + t.* = try std.Thread.spawn(&context, worker); + } + for (threads) |t| + t.wait(); + + testing.expect(context.data == thread_count * TestContext.incr_count); + } +} + +fn worker(ctx: *TestContext) void { + var i: usize = 0; + while (i != TestContext.incr_count) : (i += 1) { + const held = ctx.mutex.acquire(); + defer held.release(); + + ctx.data += 1; + } +} diff --git a/lib/std/ResetEvent.zig b/lib/std/Thread/ResetEvent.zig index 4443fdcdfb..622f9be98e 100644 --- a/lib/std/ResetEvent.zig +++ b/lib/std/Thread/ResetEvent.zig @@ -9,11 +9,11 @@ //! This API requires being initialized at runtime, and initialization //! can fail. Once initialized, the core operations cannot fail. //! If you need an abstraction that cannot fail to be initialized, see -//! `std.StaticResetEvent`. However if you can handle initialization failure, +//! `std.Thread.StaticResetEvent`. However if you can handle initialization failure, //! it is preferred to use `ResetEvent`. const ResetEvent = @This(); -const std = @import("std.zig"); +const std = @import("../std.zig"); const builtin = std.builtin; const testing = std.testing; const assert = std.debug.assert; @@ -24,13 +24,13 @@ const time = std.time; impl: Impl, pub const Impl = if (builtin.single_threaded) - std.StaticResetEvent.DebugEvent + std.Thread.StaticResetEvent.DebugEvent else if (std.Target.current.isDarwin()) DarwinEvent else if (std.Thread.use_pthreads) PosixEvent else - std.StaticResetEvent.AtomicEvent; + std.Thread.StaticResetEvent.AtomicEvent; pub const InitError = error{SystemResources}; diff --git a/lib/std/Thread/RwLock.zig b/lib/std/Thread/RwLock.zig new file mode 100644 index 0000000000..1d606a9cf1 --- /dev/null +++ b/lib/std/Thread/RwLock.zig @@ -0,0 +1,308 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A lock that supports one writer or many readers. +//! This API is for kernel threads, not evented I/O. +//! This API requires being initialized at runtime, and initialization +//! can fail. Once initialized, the core operations cannot fail. + +impl: Impl, + +const RwLock = @This(); +const std = @import("../std.zig"); +const builtin = std.builtin; +const assert = std.debug.assert; +const Mutex = std.Thread.Mutex; +const Semaphore = std.Semaphore; +const CondVar = std.CondVar; + +pub const Impl = if (builtin.single_threaded) + SingleThreadedRwLock +else if (std.Thread.use_pthreads) + PthreadRwLock +else + DefaultRwLock; + +pub fn init(rwl: *RwLock) void { + return rwl.impl.init(); +} + +pub fn deinit(rwl: *RwLock) void { + return rwl.impl.deinit(); +} + +/// Attempts to obtain exclusive lock ownership. +/// Returns `true` if the lock is obtained, `false` otherwise. +pub fn tryLock(rwl: *RwLock) bool { + return rwl.impl.tryLock(); +} + +/// Blocks until exclusive lock ownership is acquired. +pub fn lock(rwl: *RwLock) void { + return rwl.impl.lock(); +} + +/// Releases a held exclusive lock. +/// Asserts the lock is held exclusively. +pub fn unlock(rwl: *RwLock) void { + return rwl.impl.unlock(); +} + +/// Attempts to obtain shared lock ownership. +/// Returns `true` if the lock is obtained, `false` otherwise. +pub fn tryLockShared(rwl: *RwLock) bool { + return rwl.impl.tryLockShared(); +} + +/// Blocks until shared lock ownership is acquired. +pub fn lockShared(rwl: *RwLock) void { + return rwl.impl.lockShared(); +} + +/// Releases a held shared lock. +pub fn unlockShared(rwl: *RwLock) void { + return rwl.impl.unlockShared(); +} + +/// Single-threaded applications use this for deadlock checks in +/// debug mode, and no-ops in release modes. +pub const SingleThreadedRwLock = struct { + state: enum { unlocked, locked_exclusive, locked_shared }, + shared_count: usize, + + pub fn init(rwl: *SingleThreadedRwLock) void { + rwl.* = .{ + .state = .unlocked, + .shared_count = 0, + }; + } + + pub fn deinit(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .unlocked); + assert(rwl.shared_count == 0); + } + + /// Attempts to obtain exclusive lock ownership. + /// Returns `true` if the lock is obtained, `false` otherwise. + pub fn tryLock(rwl: *SingleThreadedRwLock) bool { + switch (rwl.state) { + .unlocked => { + assert(rwl.shared_count == 0); + rwl.state = .locked_exclusive; + return true; + }, + .locked_exclusive, .locked_shared => return false, + } + } + + /// Blocks until exclusive lock ownership is acquired. + pub fn lock(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .unlocked); // deadlock detected + assert(rwl.shared_count == 0); // corrupted state detected + rwl.state = .locked_exclusive; + } + + /// Releases a held exclusive lock. + /// Asserts the lock is held exclusively. + pub fn unlock(rwl: *SingleThreadedRwLock) void { + assert(rwl.state == .locked_exclusive); + assert(rwl.shared_count == 0); // corrupted state detected + rwl.state = .unlocked; + } + + /// Attempts to obtain shared lock ownership. + /// Returns `true` if the lock is obtained, `false` otherwise. + pub fn tryLockShared(rwl: *SingleThreadedRwLock) bool { + switch (rwl.state) { + .unlocked => { + rwl.state = .locked_shared; + assert(rwl.shared_count == 0); + rwl.shared_count = 1; + return true; + }, + .locked_exclusive, .locked_shared => return false, + } + } + + /// Blocks until shared lock ownership is acquired. + pub fn lockShared(rwl: *SingleThreadedRwLock) void { + switch (rwl.state) { + .unlocked => { + rwl.state = .locked_shared; + assert(rwl.shared_count == 0); + rwl.shared_count = 1; + }, + .locked_shared => { + rwl.shared_count += 1; + }, + .locked_exclusive => unreachable, // deadlock detected + } + } + + /// Releases a held shared lock. + pub fn unlockShared(rwl: *SingleThreadedRwLock) void { + switch (rwl.state) { + .unlocked => unreachable, // too many calls to `unlockShared` + .locked_exclusive => unreachable, // exclusively held lock + .locked_shared => { + rwl.shared_count -= 1; + if (rwl.shared_count == 0) { + rwl.state = .unlocked; + } + }, + } + } +}; + +pub const PthreadRwLock = struct { + rwlock: pthread_rwlock_t, + + pub fn init(rwl: *PthreadRwLock) void { + rwl.* = .{ .rwlock = .{} }; + } + + pub fn deinit(rwl: *PthreadRwLock) void { + const safe_rc = switch (std.builtin.os.tag) { + .dragonfly, .netbsd => std.os.EAGAIN, + else => 0, + }; + + const rc = std.c.pthread_rwlock_destroy(&rwl.rwlock); + assert(rc == 0 or rc == safe_rc); + + rwl.* = undefined; + } + + pub fn tryLock(rwl: *PthreadRwLock) bool { + return pthread_rwlock_trywrlock(&rwl.rwlock) == 0; + } + + pub fn lock(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_wrlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn unlock(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_unlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn tryLockShared(rwl: *PthreadRwLock) bool { + return pthread_rwlock_tryrdlock(&rwl.rwlock) == 0; + } + + pub fn lockShared(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_rdlock(&rwl.rwlock); + assert(rc == 0); + } + + pub fn unlockShared(rwl: *PthreadRwLock) void { + const rc = pthread_rwlock_unlock(&rwl.rwlock); + assert(rc == 0); + } +}; + +pub const DefaultRwLock = struct { + state: usize, + mutex: Mutex, + semaphore: Semaphore, + + const IS_WRITING: usize = 1; + const WRITER: usize = 1 << 1; + const READER: usize = 1 << (1 + std.meta.bitCount(Count)); + const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, WRITER); + const READER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, READER); + const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2)); + + pub fn init(rwl: *DefaultRwLock) void { + rwl.* = .{ + .state = 0, + .mutex = Mutex.init(), + .semaphore = Semaphore.init(0), + }; + } + + pub fn deinit(rwl: *DefaultRwLock) void { + rwl.semaphore.deinit(); + rwl.mutex.deinit(); + rwl.* = undefined; + } + + pub fn tryLock(rwl: *DefaultRwLock) bool { + if (rwl.mutex.tryLock()) { + const state = @atomicLoad(usize, &rwl.state, .SeqCst); + if (state & READER_MASK == 0) { + _ = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst); + return true; + } + + rwl.mutex.unlock(); + } + + return false; + } + + pub fn lock(rwl: *DefaultRwLock) void { + _ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .SeqCst); + rwl.mutex.lock(); + + const state = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst); + if (state & READER_MASK != 0) + rwl.semaphore.wait(); + } + + pub fn unlock(rwl: *DefaultRwLock) void { + _ = @atomicRmw(usize, &rwl.state, .And, ~IS_WRITING, .SeqCst); + rwl.mutex.unlock(); + } + + pub fn tryLockShared(rwl: *DefaultRwLock) bool { + const state = @atomicLoad(usize, &rwl.state, .SeqCst); + if (state & (IS_WRITING | WRITER_MASK) == 0) { + _ = @cmpxchgStrong( + usize, + &rwl.state, + state, + state + READER, + .SeqCst, + .SeqCst, + ) orelse return true; + } + + if (rwl.mutex.tryLock()) { + _ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst); + rwl.mutex.unlock(); + return true; + } + + return false; + } + + pub fn lockShared(rwl: *DefaultRwLock) void { + var state = @atomicLoad(usize, &rwl.state, .SeqCst); + while (state & (IS_WRITING | WRITER_MASK) == 0) { + state = @cmpxchgWeak( + usize, + &rwl.state, + state, + state + READER, + .SeqCst, + .SeqCst, + ) orelse return; + } + + rwl.mutex.lock(); + _ = @atomicRmw(usize, &rwl.state, .Add, READER, .SeqCst); + rwl.mutex.unlock(); + } + + pub fn unlockShared(rwl: *DefaultRwLock) void { + const state = @atomicRmw(usize, &rwl.state, .Sub, READER, .SeqCst); + + if ((state & READER_MASK == READER) and (state & IS_WRITING != 0)) + rwl.semaphore.post(); + } +}; diff --git a/lib/std/Thread/Semaphore.zig b/lib/std/Thread/Semaphore.zig new file mode 100644 index 0000000000..77a278b355 --- /dev/null +++ b/lib/std/Thread/Semaphore.zig @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2015-2021 Zig Contributors +// This file is part of [zig](https://ziglang.org/), which is MIT licensed. +// The MIT license requires this copyright notice to be included in all copies +// and substantial portions of the software. + +//! A semaphore is an unsigned integer that blocks the kernel thread if +//! the number would become negative. +//! This API supports static initialization and does not require deinitialization. + +mutex: Mutex = .{}, +cond: Condition = .{}, +//! It is OK to initialize this field to any value. +permits: usize = 0, + +const RwLock = @This(); +const std = @import("../std.zig"); +const Mutex = std.Thread.Mutex; +const Condition = std.Thread.Condition; + +pub fn wait(sem: *Semaphore) void { + const held = sem.mutex.acquire(); + defer held.release(); + + while (sem.permits == 0) + sem.cond.wait(&sem.mutex); + + sem.permits -= 1; + if (sem.permits > 0) + sem.cond.signal(); +} + +pub fn post(sem: *Semaphore) void { + const held = sem.mutex.acquire(); + defer held.release(); + + sem.permits += 1; + sem.cond.signal(); +} diff --git a/lib/std/StaticResetEvent.zig b/lib/std/Thread/StaticResetEvent.zig index 4e551a565e..414583e477 100644 --- a/lib/std/StaticResetEvent.zig +++ b/lib/std/Thread/StaticResetEvent.zig @@ -10,10 +10,10 @@ //! and it requires no deinitialization. The downside is that it may not //! integrate as cleanly into other synchronization APIs, or, in a worst case, //! may be forced to fall back on spin locking. As a rule of thumb, prefer -//! to use `std.ResetEvent` when possible, and use `StaticResetEvent` when +//! to use `std.Thread.ResetEvent` when possible, and use `StaticResetEvent` when //! the logic needs stronger API guarantees. -const std = @import("std.zig"); +const std = @import("../std.zig"); const StaticResetEvent = @This(); const SpinLock = std.SpinLock; const assert = std.debug.assert; @@ -53,7 +53,7 @@ pub fn reset(ev: *StaticResetEvent) void { return ev.impl.reset(); } -pub const TimedWaitResult = std.ResetEvent.TimedWaitResult; +pub const TimedWaitResult = std.Thread.ResetEvent.TimedWaitResult; /// Wait for the event to be set by blocking the current thread. /// A timeout in nanoseconds can be provided as a hint for how @@ -78,13 +78,13 @@ pub const DebugEvent = struct { }; /// This function is provided so that this type can be re-used inside - /// `std.ResetEvent`. + /// `std.Thread.ResetEvent`. pub fn init(ev: *DebugEvent) void { ev.* = .{}; } /// This function is provided so that this type can be re-used inside - /// `std.ResetEvent`. + /// `std.Thread.ResetEvent`. pub fn deinit(ev: *DebugEvent) void { ev.* = undefined; } @@ -125,13 +125,13 @@ pub const AtomicEvent = struct { const WAIT = 1 << 1; /// This function is provided so that this type can be re-used inside - /// `std.ResetEvent`. + /// `std.Thread.ResetEvent`. pub fn init(ev: *AtomicEvent) void { ev.* = .{}; } /// This function is provided so that this type can be re-used inside - /// `std.ResetEvent`. + /// `std.Thread.ResetEvent`. pub fn deinit(ev: *AtomicEvent) void { ev.* = undefined; } diff --git a/lib/std/atomic/queue.zig b/lib/std/atomic/queue.zig index fa3711cd9f..f5f63944ab 100644 --- a/lib/std/atomic/queue.zig +++ b/lib/std/atomic/queue.zig @@ -16,7 +16,7 @@ pub fn Queue(comptime T: type) type { return struct { head: ?*Node, tail: ?*Node, - mutex: std.Mutex, + mutex: std.Thread.Mutex, pub const Self = @This(); pub const Node = std.TailQueue(T).Node; @@ -27,7 +27,7 @@ pub fn Queue(comptime T: type) type { return Self{ .head = null, .tail = null, - .mutex = std.Mutex{}, + .mutex = std.Thread.Mutex{}, }; } diff --git a/lib/std/auto_reset_event.zig b/lib/std/auto_reset_event.zig deleted file mode 100644 index 39cd184a68..0000000000 --- a/lib/std/auto_reset_event.zig +++ /dev/null @@ -1,226 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std.zig"); -const builtin = @import("builtin"); -const testing = std.testing; -const assert = std.debug.assert; -const StaticResetEvent = std.StaticResetEvent; - -/// Similar to `StaticResetEvent` but on `set()` it also (atomically) does `reset()`. -/// Unlike StaticResetEvent, `wait()` can only be called by one thread (MPSC-like). -pub const AutoResetEvent = struct { - /// AutoResetEvent has 3 possible states: - /// - UNSET: the AutoResetEvent is currently unset - /// - SET: the AutoResetEvent was notified before a wait() was called - /// - <StaticResetEvent pointer>: there is an active waiter waiting for a notification. - /// - /// When attempting to wait: - /// if the event is unset, it registers a ResetEvent pointer to be notified when the event is set - /// if the event is already set, then it consumes the notification and resets the event. - /// - /// When attempting to notify: - /// if the event is unset, then we set the event - /// if theres a waiting ResetEvent, then we unset the event and notify the ResetEvent - /// - /// This ensures that the event is automatically reset after a wait() has been issued - /// and avoids the race condition when using StaticResetEvent in the following scenario: - /// thread 1 | thread 2 - /// StaticResetEvent.wait() | - /// | StaticResetEvent.set() - /// | StaticResetEvent.set() - /// StaticResetEvent.reset() | - /// StaticResetEvent.wait() | (missed the second .set() notification above) - state: usize = UNSET, - - const UNSET = 0; - const SET = 1; - - /// the minimum alignment for the `*StaticResetEvent` created by wait*() - const event_align = std.math.max(@alignOf(StaticResetEvent), 2); - - pub fn wait(self: *AutoResetEvent) void { - self.waitFor(null) catch unreachable; - } - - pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void { - return self.waitFor(timeout); - } - - fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void { - // lazily initialized StaticResetEvent - var reset_event: StaticResetEvent align(event_align) = undefined; - var has_reset_event = false; - - var state = @atomicLoad(usize, &self.state, .SeqCst); - while (true) { - // consume a notification if there is any - if (state == SET) { - @atomicStore(usize, &self.state, UNSET, .SeqCst); - return; - } - - // check if theres currently a pending ResetEvent pointer already registered - if (state != UNSET) { - unreachable; // multiple waiting threads on the same AutoResetEvent - } - - // lazily initialize the ResetEvent if it hasn't been already - if (!has_reset_event) { - has_reset_event = true; - reset_event = .{}; - } - - // Since the AutoResetEvent currently isnt set, - // try to register our ResetEvent on it to wait - // for a set() call from another thread. - if (@cmpxchgWeak( - usize, - &self.state, - UNSET, - @ptrToInt(&reset_event), - .SeqCst, - .SeqCst, - )) |new_state| { - state = new_state; - continue; - } - - // if no timeout was specified, then just wait forever - const timeout_ns = timeout orelse { - reset_event.wait(); - return; - }; - - // wait with a timeout and return if signalled via set() - switch (reset_event.timedWait(timeout_ns)) { - .event_set => return, - .timed_out => {}, - } - - // If we timed out, we need to transition the AutoResetEvent back to UNSET. - // If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent. - state = @cmpxchgStrong( - usize, - &self.state, - @ptrToInt(&reset_event), - UNSET, - .SeqCst, - .SeqCst, - ) orelse return error.TimedOut; - - // We didn't manage to unregister ourselves from the state. - if (state == SET) { - unreachable; // AutoResetEvent notified without waking up the waiting thread - } else if (state != UNSET) { - unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out - } - - // This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up. - // We need to wait for it to wake up our ResetEvent before we can return and invalidate it. - // We don't return error.TimedOut here as it technically notified us while we were "timing out". - reset_event.wait(); - return; - } - } - - pub fn set(self: *AutoResetEvent) void { - var state = @atomicLoad(usize, &self.state, .SeqCst); - while (true) { - // If the AutoResetEvent is already set, there is nothing else left to do - if (state == SET) { - return; - } - - // If the AutoResetEvent isn't set, - // then try to leave a notification for the wait() thread that we set() it. - if (state == UNSET) { - state = @cmpxchgWeak( - usize, - &self.state, - UNSET, - SET, - .SeqCst, - .SeqCst, - ) orelse return; - continue; - } - - // There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting. - // Try to acquire ownership of it so that we can wake it up. - // This also resets the AutoResetEvent so that there is no race condition as defined above. - if (@cmpxchgWeak( - usize, - &self.state, - state, - UNSET, - .SeqCst, - .SeqCst, - )) |new_state| { - state = new_state; - continue; - } - - const reset_event = @intToPtr(*align(event_align) StaticResetEvent, state); - reset_event.set(); - return; - } - } -}; - -test "std.AutoResetEvent" { - // test local code paths - { - var event = AutoResetEvent{}; - testing.expectError(error.TimedOut, event.timedWait(1)); - event.set(); - event.wait(); - } - - // test cross-thread signaling - if (builtin.single_threaded) - return; - - const Context = struct { - value: u128 = 0, - in: AutoResetEvent = AutoResetEvent{}, - out: AutoResetEvent = AutoResetEvent{}, - - const Self = @This(); - - fn sender(self: *Self) void { - testing.expect(self.value == 0); - self.value = 1; - self.out.set(); - - self.in.wait(); - testing.expect(self.value == 2); - self.value = 3; - self.out.set(); - - self.in.wait(); - testing.expect(self.value == 4); - } - - fn receiver(self: *Self) void { - self.out.wait(); - testing.expect(self.value == 1); - self.value = 2; - self.in.set(); - - self.out.wait(); - testing.expect(self.value == 3); - self.value = 4; - self.in.set(); - } - }; - - var context = Context{}; - const send_thread = try std.Thread.spawn(&context, Context.sender); - const recv_thread = try std.Thread.spawn(&context, Context.receiver); - - send_thread.wait(); - recv_thread.wait(); -} diff --git a/lib/std/c.zig b/lib/std/c.zig index b7a412339e..1e86bfbd8c 100644 --- a/lib/std/c.zig +++ b/lib/std/c.zig @@ -338,6 +338,13 @@ pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) c_int; pub extern "c" fn pthread_cond_broadcast(cond: *pthread_cond_t) c_int; pub extern "c" fn pthread_cond_destroy(cond: *pthread_cond_t) c_int; +pub extern "c" fn pthread_rwlock_destroy(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_rdlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_wrlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_tryrdlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_trywrlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; +pub extern "c" fn pthread_rwlock_unlock(rwl: *pthread_rwlock_t) callconv(.C) c_int; + pub const pthread_t = *opaque {}; pub const FILE = opaque {}; diff --git a/lib/std/c/darwin.zig b/lib/std/c/darwin.zig index 7527752527..b5c3fbf977 100644 --- a/lib/std/c/darwin.zig +++ b/lib/std/c/darwin.zig @@ -177,6 +177,10 @@ pub const pthread_cond_t = extern struct { __sig: c_long = 0x3CB0B1BB, __opaque: [__PTHREAD_COND_SIZE__]u8 = [_]u8{0} ** __PTHREAD_COND_SIZE__, }; +pub const pthread_rwlock_t = extern struct { + __sig: c_long = 0x2DA8B3B4, + __opaque: [192]u8 = [_]u8{0} ** 192, +}; pub const sem_t = c_int; const __PTHREAD_MUTEX_SIZE__ = if (@sizeOf(usize) == 8) 56 else 40; const __PTHREAD_COND_SIZE__ = if (@sizeOf(usize) == 8) 40 else 24; @@ -192,7 +196,7 @@ pub extern "c" fn pthread_threadid_np(thread: ?pthread_t, thread_id: *u64) c_int pub extern "c" fn arc4random_buf(buf: [*]u8, len: usize) void; // Grand Central Dispatch is exposed by libSystem. -pub const dispatch_semaphore_t = *opaque{}; +pub const dispatch_semaphore_t = *opaque {}; pub const dispatch_time_t = u64; pub const DISPATCH_TIME_NOW = @as(dispatch_time_t, 0); pub const DISPATCH_TIME_FOREVER = ~@as(dispatch_time_t, 0); diff --git a/lib/std/c/emscripten.zig b/lib/std/c/emscripten.zig index 1652975eb9..526eb9e99c 100644 --- a/lib/std/c/emscripten.zig +++ b/lib/std/c/emscripten.zig @@ -9,5 +9,8 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { size: [__SIZEOF_PTHREAD_COND_T]u8 align(@alignOf(usize)) = [_]u8{0} ** __SIZEOF_PTHREAD_COND_T, }; +pub const pthread_rwlock_t = extern struct { + size: [32]u8 align(4) = [_]u8{0} ** 32, +}; const __SIZEOF_PTHREAD_COND_T = 48; const __SIZEOF_PTHREAD_MUTEX_T = 28; diff --git a/lib/std/c/freebsd.zig b/lib/std/c/freebsd.zig index a8d11b95b2..a6c84c66fa 100644 --- a/lib/std/c/freebsd.zig +++ b/lib/std/c/freebsd.zig @@ -43,6 +43,9 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { inner: ?*c_void = null, }; +pub const pthread_rwlock_t = extern struct { + ptr: ?*c_void = null, +}; pub const pthread_attr_t = extern struct { __size: [56]u8, diff --git a/lib/std/c/fuchsia.zig b/lib/std/c/fuchsia.zig index bc53dc81a6..fc34f49d22 100644 --- a/lib/std/c/fuchsia.zig +++ b/lib/std/c/fuchsia.zig @@ -9,5 +9,8 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { size: [__SIZEOF_PTHREAD_COND_T]u8 align(@alignOf(usize)) = [_]u8{0} ** __SIZEOF_PTHREAD_COND_T, }; +pub const pthread_rwlock_t = extern struct { + size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56, +}; const __SIZEOF_PTHREAD_COND_T = 48; const __SIZEOF_PTHREAD_MUTEX_T = 40; diff --git a/lib/std/c/haiku.zig b/lib/std/c/haiku.zig index 438012c3b3..0f695a9446 100644 --- a/lib/std/c/haiku.zig +++ b/lib/std/c/haiku.zig @@ -17,3 +17,12 @@ pub const pthread_cond_t = extern struct { waiter_count: i32 = 0, lock: i32 = 0, }; +pub const pthread_rwlock_t = extern struct { + flags: u32 = 0, + owner: i32 = -1, + lock_sem: i32 = 0, + lock_count: i32 = 0, + reader_count: i32 = 0, + writer_count: i32 = 0, + waiters: [2]?*c_void = [_]?*c_void{ null, null }, +}; diff --git a/lib/std/c/hermit.zig b/lib/std/c/hermit.zig index fa351bc0db..a159395ab3 100644 --- a/lib/std/c/hermit.zig +++ b/lib/std/c/hermit.zig @@ -9,3 +9,6 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { inner: usize = ~@as(usize, 0), }; +pub const pthread_rwlock_t = extern struct { + ptr: usize = std.math.maxInt(usize), +}; diff --git a/lib/std/c/linux.zig b/lib/std/c/linux.zig index db464d7a6d..fbfabdd568 100644 --- a/lib/std/c/linux.zig +++ b/lib/std/c/linux.zig @@ -123,6 +123,32 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { size: [__SIZEOF_PTHREAD_COND_T]u8 align(@alignOf(usize)) = [_]u8{0} ** __SIZEOF_PTHREAD_COND_T, }; +pub const pthread_rwlock_t = switch (std.builtin.abi) { + .android => switch (@sizeOf(usize)) { + 4 => extern struct { + lock: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, + cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER, + numLocks: c_int = 0, + writerThreadId: c_int = 0, + pendingReaders: c_int = 0, + pendingWriters: c_int = 0, + attr: i32 = 0, + __reserved: [12]u8 = [_]u8{0} ** 2, + }, + 8 => extern struct { + numLocks: c_int = 0, + writerThreadId: c_int = 0, + pendingReaders: c_int = 0, + pendingWriters: c_int = 0, + attr: i32 = 0, + __reserved: [36]u8 = [_]u8{0} ** 36, + }, + else => unreachable, + }, + else => extern struct { + size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56, + }, +}; pub const sem_t = extern struct { __size: [__SIZEOF_SEM_T]u8 align(@alignOf(usize)), }; diff --git a/lib/std/c/netbsd.zig b/lib/std/c/netbsd.zig index 258ee6a5c6..46a38706f4 100644 --- a/lib/std/c/netbsd.zig +++ b/lib/std/c/netbsd.zig @@ -54,6 +54,22 @@ pub const pthread_cond_t = extern struct { ptc_private: ?*c_void = null, }; +pub const pthread_rwlock_t = extern struct { + ptr_magic: c_uint = 0x99990009, + ptr_interlock: switch (std.builtin.arch) { + .aarch64, .sparc, .x86_64, .i386 => u8, + .arm, .powerpc => c_int, + else => unreachable, + } = 0, + ptr_rblocked_first: ?*u8 = null, + ptr_rblocked_last: ?*u8 = null, + ptr_wblocked_first: ?*u8 = null, + ptr_wblocked_last: ?*u8 = null, + ptr_nreaders: c_uint = 0, + ptr_owner: std.c.pthread_t = null, + ptr_private: ?*c_void = null, +}; + const pthread_spin_t = switch (builtin.arch) { .aarch64, .aarch64_be, .aarch64_32 => u8, .mips, .mipsel, .mips64, .mips64el => u32, diff --git a/lib/std/c/openbsd.zig b/lib/std/c/openbsd.zig index dd89c837ff..99debf57e7 100644 --- a/lib/std/c/openbsd.zig +++ b/lib/std/c/openbsd.zig @@ -27,6 +27,9 @@ pub const pthread_mutex_t = extern struct { pub const pthread_cond_t = extern struct { inner: ?*c_void = null, }; +pub const pthread_rwlock_t = extern struct { + ptr: ?*c_void = null, +}; pub const pthread_spinlock_t = extern struct { inner: ?*c_void = null, }; diff --git a/lib/std/debug.zig b/lib/std/debug.zig index 15d3baa1d0..e4ef25724b 100644 --- a/lib/std/debug.zig +++ b/lib/std/debug.zig @@ -50,7 +50,7 @@ pub const LineInfo = struct { } }; -var stderr_mutex = std.Mutex{}; +var stderr_mutex = std.Thread.Mutex{}; /// Deprecated. Use `std.log` functions for logging or `std.debug.print` for /// "printf debugging". @@ -65,7 +65,7 @@ pub fn print(comptime fmt: []const u8, args: anytype) void { nosuspend stderr.print(fmt, args) catch return; } -pub fn getStderrMutex() *std.Mutex { +pub fn getStderrMutex() *std.Thread.Mutex { return &stderr_mutex; } @@ -235,7 +235,7 @@ pub fn panic(comptime format: []const u8, args: anytype) noreturn { var panicking: u8 = 0; // Locked to avoid interleaving panic messages from multiple threads. -var panic_mutex = std.Mutex{}; +var panic_mutex = std.Thread.Mutex{}; /// Counts how many times the panic handler is invoked by this thread. /// This is used to catch and handle panics triggered by the panic handler. @@ -280,7 +280,7 @@ pub fn panicExtra(trace: ?*const builtin.StackTrace, first_trace_addr: ?usize, c // and call abort() // Sleep forever without hammering the CPU - var event: std.StaticResetEvent = .{}; + var event: std.Thread.StaticResetEvent = .{}; event.wait(); unreachable; } diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig index 17d79c753b..d48c6c1520 100644 --- a/lib/std/event/lock.zig +++ b/lib/std/event/lock.zig @@ -16,7 +16,7 @@ const Loop = std.event.Loop; /// Allows only one actor to hold the lock. /// TODO: make this API also work in blocking I/O mode. pub const Lock = struct { - mutex: std.Mutex = std.Mutex{}, + mutex: std.Thread.Mutex = std.Thread.Mutex{}, head: usize = UNLOCKED, const UNLOCKED = 0; diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index 8101d27a55..492b7c1758 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -29,7 +29,7 @@ pub const Loop = struct { fs_thread: *Thread, fs_queue: std.atomic.Queue(Request), fs_end_request: Request.Node, - fs_thread_wakeup: std.ResetEvent, + fs_thread_wakeup: std.Thread.ResetEvent, /// For resources that have the same lifetime as the `Loop`. /// This is only used by `Loop` for the thread pool and associated resources. @@ -785,7 +785,7 @@ pub const Loop = struct { timer: std.time.Timer, waiters: Waiters, thread: *std.Thread, - event: std.AutoResetEvent, + event: std.Thread.AutoResetEvent, is_running: bool, /// Initialize the delay queue by spawning the timer thread @@ -796,7 +796,7 @@ pub const Loop = struct { .waiters = DelayQueue.Waiters{ .entries = std.atomic.Queue(anyframe).init(), }, - .event = std.AutoResetEvent{}, + .event = std.Thread.AutoResetEvent{}, .is_running = true, // Must be last so that it can read the other state, such as `is_running`. .thread = try std.Thread.spawn(self, DelayQueue.run), diff --git a/lib/std/event/wait_group.zig b/lib/std/event/wait_group.zig index d123f7df27..0b83c18c74 100644 --- a/lib/std/event/wait_group.zig +++ b/lib/std/event/wait_group.zig @@ -30,7 +30,7 @@ pub fn WaitGroupGeneric(comptime counter_size: u16) type { return struct { counter: CounterType = 0, max_counter: CounterType = std.math.maxInt(CounterType), - mutex: std.Mutex = .{}, + mutex: std.Thread.Mutex = .{}, waiters: ?*Waiter = null, const Waiter = struct { next: ?*Waiter, diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index 9afcc50f3c..e9f28a0b60 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -750,7 +750,7 @@ test "open file with exclusive lock twice, make sure it waits" { errdefer file.close(); const S = struct { - const C = struct { dir: *fs.Dir, evt: *std.ResetEvent }; + const C = struct { dir: *fs.Dir, evt: *std.Thread.ResetEvent }; fn checkFn(ctx: C) !void { const file1 = try ctx.dir.createFile(filename, .{ .lock = .Exclusive }); defer file1.close(); @@ -758,7 +758,7 @@ test "open file with exclusive lock twice, make sure it waits" { } }; - var evt: std.ResetEvent = undefined; + var evt: std.Thread.ResetEvent = undefined; try evt.init(); defer evt.deinit(); diff --git a/lib/std/heap/general_purpose_allocator.zig b/lib/std/heap/general_purpose_allocator.zig index 415b206b6a..05f05c1da3 100644 --- a/lib/std/heap/general_purpose_allocator.zig +++ b/lib/std/heap/general_purpose_allocator.zig @@ -149,12 +149,12 @@ pub const Config = struct { thread_safe: bool = !std.builtin.single_threaded, /// What type of mutex you'd like to use, for thread safety. - /// when specfied, the mutex type must have the same shape as `std.Mutex` and + /// when specfied, the mutex type must have the same shape as `std.Thread.Mutex` and /// `std.mutex.Dummy`, and have no required fields. Specifying this field causes /// the `thread_safe` field to be ignored. /// /// when null (default): - /// * the mutex type defaults to `std.Mutex` when thread_safe is enabled. + /// * the mutex type defaults to `std.Thread.Mutex` when thread_safe is enabled. /// * the mutex type defaults to `std.mutex.Dummy` otherwise. MutexType: ?type = null, @@ -187,7 +187,7 @@ pub fn GeneralPurposeAllocator(comptime config: Config) type { const mutex_init = if (config.MutexType) |T| T{} else if (config.thread_safe) - std.Mutex{} + std.Thread.Mutex{} else std.mutex.Dummy{}; @@ -869,9 +869,9 @@ test "realloc large object to small object" { } test "overrideable mutexes" { - var gpa = GeneralPurposeAllocator(.{ .MutexType = std.Mutex }){ + var gpa = GeneralPurposeAllocator(.{ .MutexType = std.Thread.Mutex }){ .backing_allocator = std.testing.allocator, - .mutex = std.Mutex{}, + .mutex = std.Thread.Mutex{}, }; defer std.testing.expect(!gpa.deinit()); const allocator = &gpa.allocator; diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig deleted file mode 100644 index 50bbb40bf0..0000000000 --- a/lib/std/mutex.zig +++ /dev/null @@ -1,379 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std.zig"); -const builtin = @import("builtin"); -const os = std.os; -const assert = std.debug.assert; -const windows = os.windows; -const testing = std.testing; -const SpinLock = std.SpinLock; -const StaticResetEvent = std.StaticResetEvent; - -/// Lock may be held only once. If the same thread tries to acquire -/// the same mutex twice, it deadlocks. This type supports static -/// initialization and is at most `@sizeOf(usize)` in size. When an -/// application is built in single threaded release mode, all the -/// functions are no-ops. In single threaded debug mode, there is -/// deadlock detection. -/// -/// Example usage: -/// var m = Mutex{}; -/// -/// const lock = m.acquire(); -/// defer lock.release(); -/// ... critical code -/// -/// Non-blocking: -/// if (m.tryAcquire) |lock| { -/// defer lock.release(); -/// // ... critical section -/// } else { -/// // ... lock not acquired -/// } -pub const Mutex = if (builtin.single_threaded) - Dummy -else if (builtin.os.tag == .windows) - WindowsMutex -else if (std.Thread.use_pthreads) - PthreadMutex -else if (builtin.link_libc or builtin.os.tag == .linux) - // stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs - struct { - state: usize = 0, - - /// number of times to spin trying to acquire the lock. - /// https://webkit.org/blog/6161/locking-in-webkit/ - const SPIN_COUNT = 40; - - const MUTEX_LOCK: usize = 1 << 0; - const QUEUE_LOCK: usize = 1 << 1; - const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK); - - const Node = struct { - next: ?*Node, - event: StaticResetEvent, - }; - - pub fn tryAcquire(self: *Mutex) ?Held { - if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic) != null) - return null; - return Held{ .mutex = self }; - } - - pub fn acquire(self: *Mutex) Held { - return self.tryAcquire() orelse { - self.acquireSlow(); - return Held{ .mutex = self }; - }; - } - - fn acquireSlow(self: *Mutex) void { - // inlining the fast path and hiding *Slow() - // calls behind a @setCold(true) appears to - // improve performance in release builds. - @setCold(true); - while (true) { - - // try and spin for a bit to acquire the mutex if theres currently no queue - var spin_count: u32 = SPIN_COUNT; - var state = @atomicLoad(usize, &self.state, .Monotonic); - while (spin_count != 0) : (spin_count -= 1) { - if (state & MUTEX_LOCK == 0) { - _ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; - } else if (state & QUEUE_MASK == 0) { - break; - } - SpinLock.yield(); - state = @atomicLoad(usize, &self.state, .Monotonic); - } - - // create the StaticResetEvent node on the stack - // (faster than threadlocal on platforms like OSX) - var node: Node = .{ - .next = undefined, - .event = .{}, - }; - - // we've spun too long, try and add our node to the LIFO queue. - // if the mutex becomes available in the process, try and grab it instead. - while (true) { - if (state & MUTEX_LOCK == 0) { - _ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; - } else { - node.next = @intToPtr(?*Node, state & QUEUE_MASK); - const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK); - _ = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { - node.event.wait(); - break; - }; - } - SpinLock.yield(); - state = @atomicLoad(usize, &self.state, .Monotonic); - } - } - } - - /// Returned when the lock is acquired. Call release to - /// release. - pub const Held = struct { - mutex: *Mutex, - - /// Release the held lock. - pub fn release(self: Held) void { - // first, remove the lock bit so another possibly parallel acquire() can succeed. - // use .Sub since it can be usually compiled down more efficiency - // (`lock sub` on x86) vs .And ~MUTEX_LOCK (`lock cmpxchg` loop on x86) - const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release); - - // if the LIFO queue isnt locked and it has a node, try and wake up the node. - if ((state & QUEUE_LOCK) == 0 and (state & QUEUE_MASK) != 0) - self.mutex.releaseSlow(); - } - }; - - fn releaseSlow(self: *Mutex) void { - @setCold(true); - - // try and lock the LFIO queue to pop a node off, - // stopping altogether if its already locked or the queue is empty - var state = @atomicLoad(usize, &self.state, .Monotonic); - while (true) : (SpinLock.loopHint(1)) { - if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0) - return; - state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break; - } - - // acquired the QUEUE_LOCK, try and pop a node to wake it. - // if the mutex is locked, then unset QUEUE_LOCK and let - // the thread who holds the mutex do the wake-up on unlock() - while (true) : (SpinLock.loopHint(1)) { - if ((state & MUTEX_LOCK) != 0) { - state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Acquire) orelse return; - } else { - const node = @intToPtr(*Node, state & QUEUE_MASK); - const new_state = @ptrToInt(node.next); - state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Acquire) orelse { - node.event.set(); - return; - }; - } - } - } - } - - // for platforms without a known OS blocking - // primitive, default to SpinLock for correctness -else - SpinLock; - -pub const PthreadMutex = struct { - pthread_mutex: std.c.pthread_mutex_t = init, - - pub const Held = struct { - mutex: *PthreadMutex, - - pub fn release(self: Held) void { - switch (std.c.pthread_mutex_unlock(&self.mutex.pthread_mutex)) { - 0 => return, - std.c.EINVAL => unreachable, - std.c.EAGAIN => unreachable, - std.c.EPERM => unreachable, - else => unreachable, - } - } - }; - - /// Create a new mutex in unlocked state. - pub const init = std.c.PTHREAD_MUTEX_INITIALIZER; - - /// Try to acquire the mutex without blocking. Returns null if - /// the mutex is unavailable. Otherwise returns Held. Call - /// release on Held. - pub fn tryAcquire(self: *PthreadMutex) ?Held { - if (std.c.pthread_mutex_trylock(&self.pthread_mutex) == 0) { - return Held{ .mutex = self }; - } else { - return null; - } - } - - /// Acquire the mutex. Will deadlock if the mutex is already - /// held by the calling thread. - pub fn acquire(self: *PthreadMutex) Held { - switch (std.c.pthread_mutex_lock(&self.pthread_mutex)) { - 0 => return Held{ .mutex = self }, - std.c.EINVAL => unreachable, - std.c.EBUSY => unreachable, - std.c.EAGAIN => unreachable, - std.c.EDEADLK => unreachable, - std.c.EPERM => unreachable, - else => unreachable, - } - } -}; - -/// This has the sematics as `Mutex`, however it does not actually do any -/// synchronization. Operations are safety-checked no-ops. -pub const Dummy = struct { - lock: @TypeOf(lock_init) = lock_init, - - const lock_init = if (std.debug.runtime_safety) false else {}; - - pub const Held = struct { - mutex: *Dummy, - - pub fn release(self: Held) void { - if (std.debug.runtime_safety) { - self.mutex.lock = false; - } - } - }; - - /// Create a new mutex in unlocked state. - pub const init = Dummy{}; - - /// Try to acquire the mutex without blocking. Returns null if - /// the mutex is unavailable. Otherwise returns Held. Call - /// release on Held. - pub fn tryAcquire(self: *Dummy) ?Held { - if (std.debug.runtime_safety) { - if (self.lock) return null; - self.lock = true; - } - return Held{ .mutex = self }; - } - - /// Acquire the mutex. Will deadlock if the mutex is already - /// held by the calling thread. - pub fn acquire(self: *Dummy) Held { - return self.tryAcquire() orelse @panic("deadlock detected"); - } -}; - -// https://locklessinc.com/articles/keyed_events/ -const WindowsMutex = struct { - state: State = State{ .waiters = 0 }, - - const State = extern union { - locked: u8, - waiters: u32, - }; - - const WAKE = 1 << 8; - const WAIT = 1 << 9; - - pub fn tryAcquire(self: *WindowsMutex) ?Held { - if (@atomicRmw(u8, &self.state.locked, .Xchg, 1, .Acquire) != 0) - return null; - return Held{ .mutex = self }; - } - - pub fn acquire(self: *WindowsMutex) Held { - return self.tryAcquire() orelse self.acquireSlow(); - } - - fn acquireSpinning(self: *WindowsMutex) Held { - @setCold(true); - while (true) : (SpinLock.yield()) { - return self.tryAcquire() orelse continue; - } - } - - fn acquireSlow(self: *WindowsMutex) Held { - // try to use NT keyed events for blocking, falling back to spinlock if unavailable - @setCold(true); - const handle = StaticResetEvent.Impl.Futex.getEventHandle() orelse return self.acquireSpinning(); - const key = @ptrCast(*const c_void, &self.state.waiters); - - while (true) : (SpinLock.loopHint(1)) { - const waiters = @atomicLoad(u32, &self.state.waiters, .Monotonic); - - // try and take lock if unlocked - if ((waiters & 1) == 0) { - if (@atomicRmw(u8, &self.state.locked, .Xchg, 1, .Acquire) == 0) { - return Held{ .mutex = self }; - } - - // otherwise, try and update the waiting count. - // then unset the WAKE bit so that another unlocker can wake up a thread. - } else if (@cmpxchgWeak(u32, &self.state.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) { - const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .SUCCESS); - _ = @atomicRmw(u32, &self.state.waiters, .Sub, WAKE, .Monotonic); - } - } - } - - pub const Held = struct { - mutex: *WindowsMutex, - - pub fn release(self: Held) void { - // unlock without a rmw/cmpxchg instruction - @atomicStore(u8, @ptrCast(*u8, &self.mutex.state.locked), 0, .Release); - const handle = StaticResetEvent.Impl.Futex.getEventHandle() orelse return; - const key = @ptrCast(*const c_void, &self.mutex.state.waiters); - - while (true) : (SpinLock.loopHint(1)) { - const waiters = @atomicLoad(u32, &self.mutex.state.waiters, .Monotonic); - - // no one is waiting - if (waiters < WAIT) return; - // someone grabbed the lock and will do the wake instead - if (waiters & 1 != 0) return; - // someone else is currently waking up - if (waiters & WAKE != 0) return; - - // try to decrease the waiter count & set the WAKE bit meaning a thread is waking up - if (@cmpxchgWeak(u32, &self.mutex.state.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null) { - const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null); - assert(rc == .SUCCESS); - return; - } - } - } - }; -}; - -const TestContext = struct { - mutex: *Mutex, - data: i128, - - const incr_count = 10000; -}; - -test "std.Mutex" { - var mutex = Mutex{}; - - var context = TestContext{ - .mutex = &mutex, - .data = 0, - }; - - if (builtin.single_threaded) { - worker(&context); - testing.expect(context.data == TestContext.incr_count); - } else { - const thread_count = 10; - var threads: [thread_count]*std.Thread = undefined; - for (threads) |*t| { - t.* = try std.Thread.spawn(&context, worker); - } - for (threads) |t| - t.wait(); - - testing.expect(context.data == thread_count * TestContext.incr_count); - } -} - -fn worker(ctx: *TestContext) void { - var i: usize = 0; - while (i != TestContext.incr_count) : (i += 1) { - const held = ctx.mutex.acquire(); - defer held.release(); - - ctx.data += 1; - } -} diff --git a/lib/std/once.zig b/lib/std/once.zig index f4ac47f8d8..efa99060d3 100644 --- a/lib/std/once.zig +++ b/lib/std/once.zig @@ -15,7 +15,7 @@ pub fn once(comptime f: fn () void) Once(f) { pub fn Once(comptime f: fn () void) type { return struct { done: bool = false, - mutex: std.Mutex = std.Mutex{}, + mutex: std.Thread.Mutex = std.Thread.Mutex{}, /// Call the function `f`. /// If `call` is invoked multiple times `f` will be executed only the diff --git a/lib/std/os/windows/bits.zig b/lib/std/os/windows/bits.zig index d8a2fb4a4d..8461378da0 100644 --- a/lib/std/os/windows/bits.zig +++ b/lib/std/os/windows/bits.zig @@ -1635,3 +1635,8 @@ pub const OBJECT_NAME_INFORMATION = extern struct { Name: UNICODE_STRING, }; pub const POBJECT_NAME_INFORMATION = *OBJECT_NAME_INFORMATION; + +pub const SRWLOCK = usize; +pub const SRWLOCK_INIT: SRWLOCK = 0; +pub const CONDITION_VARIABLE = usize; +pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0; diff --git a/lib/std/os/windows/kernel32.zig b/lib/std/os/windows/kernel32.zig index 78d2a6501e..ec4a75afa9 100644 --- a/lib/std/os/windows/kernel32.zig +++ b/lib/std/os/windows/kernel32.zig @@ -289,3 +289,16 @@ pub extern "kernel32" fn K32QueryWorkingSet(hProcess: HANDLE, pv: PVOID, cb: DWO pub extern "kernel32" fn K32QueryWorkingSetEx(hProcess: HANDLE, pv: PVOID, cb: DWORD) callconv(WINAPI) BOOL; pub extern "kernel32" fn FlushFileBuffers(hFile: HANDLE) callconv(WINAPI) BOOL; + +pub extern "kernel32" fn WakeAllConditionVariable(c: *CONDITION_VARIABLE) callconv(WINAPI) void; +pub extern "kernel32" fn WakeConditionVariable(c: *CONDITION_VARIABLE) callconv(WINAPI) void; +pub extern "kernel32" fn SleepConditionVariableSRW( + c: *CONDITION_VARIABLE, + s: *SRWLOCK, + t: DWORD, + f: ULONG, +) callconv(WINAPI) BOOL; + +pub extern "kernel32" fn TryAcquireSRWLockExclusive(s: *SRWLOCK) callconv(WINAPI) BOOL; +pub extern "kernel32" fn AcquireSRWLockExclusive(s: *SRWLOCK) callconv(WINAPI) void; +pub extern "kernel32" fn ReleaseSRWLockExclusive(s: *SRWLOCK) callconv(WINAPI) void; diff --git a/lib/std/std.zig b/lib/std/std.zig index d736899a45..d085d4fc41 100644 --- a/lib/std/std.zig +++ b/lib/std/std.zig @@ -13,7 +13,6 @@ pub const AutoArrayHashMap = array_hash_map.AutoArrayHashMap; pub const AutoArrayHashMapUnmanaged = array_hash_map.AutoArrayHashMapUnmanaged; pub const AutoHashMap = hash_map.AutoHashMap; pub const AutoHashMapUnmanaged = hash_map.AutoHashMapUnmanaged; -pub const AutoResetEvent = @import("auto_reset_event.zig").AutoResetEvent; pub const BufMap = @import("buf_map.zig").BufMap; pub const BufSet = @import("buf_set.zig").BufSet; pub const ChildProcess = @import("child_process.zig").ChildProcess; @@ -21,26 +20,21 @@ pub const ComptimeStringMap = @import("comptime_string_map.zig").ComptimeStringM pub const DynLib = @import("dynamic_library.zig").DynLib; pub const HashMap = hash_map.HashMap; pub const HashMapUnmanaged = hash_map.HashMapUnmanaged; -pub const mutex = @import("mutex.zig"); -pub const Mutex = mutex.Mutex; pub const PackedIntArray = @import("packed_int_array.zig").PackedIntArray; pub const PackedIntArrayEndian = @import("packed_int_array.zig").PackedIntArrayEndian; pub const PackedIntSlice = @import("packed_int_array.zig").PackedIntSlice; pub const PackedIntSliceEndian = @import("packed_int_array.zig").PackedIntSliceEndian; pub const PriorityQueue = @import("priority_queue.zig").PriorityQueue; pub const Progress = @import("Progress.zig"); -pub const ResetEvent = @import("ResetEvent.zig"); pub const SemanticVersion = @import("SemanticVersion.zig"); pub const SinglyLinkedList = @import("linked_list.zig").SinglyLinkedList; -pub const SpinLock = @import("SpinLock.zig"); -pub const StaticResetEvent = @import("StaticResetEvent.zig"); pub const StringHashMap = hash_map.StringHashMap; pub const StringHashMapUnmanaged = hash_map.StringHashMapUnmanaged; pub const StringArrayHashMap = array_hash_map.StringArrayHashMap; pub const StringArrayHashMapUnmanaged = array_hash_map.StringArrayHashMapUnmanaged; pub const TailQueue = @import("linked_list.zig").TailQueue; pub const Target = @import("target.zig").Target; -pub const Thread = @import("thread.zig").Thread; +pub const Thread = @import("Thread.zig"); pub const array_hash_map = @import("array_hash_map.zig"); pub const atomic = @import("atomic.zig"); @@ -98,12 +92,7 @@ test "" { // server is hitting OOM. TODO revert this after stage2 arrives. _ = ChildProcess; _ = DynLib; - _ = mutex; - _ = Mutex; _ = Progress; - _ = ResetEvent; - _ = SpinLock; - _ = StaticResetEvent; _ = Target; _ = Thread; diff --git a/lib/std/thread.zig b/lib/std/thread.zig deleted file mode 100644 index 0fee13b057..0000000000 --- a/lib/std/thread.zig +++ /dev/null @@ -1,526 +0,0 @@ -// SPDX-License-Identifier: MIT -// Copyright (c) 2015-2021 Zig Contributors -// This file is part of [zig](https://ziglang.org/), which is MIT licensed. -// The MIT license requires this copyright notice to be included in all copies -// and substantial portions of the software. -const std = @import("std.zig"); -const builtin = std.builtin; -const os = std.os; -const mem = std.mem; -const windows = std.os.windows; -const c = std.c; -const assert = std.debug.assert; - -const bad_startfn_ret = "expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"; - -pub const Thread = struct { - data: Data, - - pub const use_pthreads = std.Target.current.os.tag != .windows and builtin.link_libc; - - /// Represents a kernel thread handle. - /// May be an integer or a pointer depending on the platform. - /// On Linux and POSIX, this is the same as Id. - pub const Handle = if (use_pthreads) - c.pthread_t - else switch (std.Target.current.os.tag) { - .linux => i32, - .windows => windows.HANDLE, - else => void, - }; - - /// Represents a unique ID per thread. - /// May be an integer or pointer depending on the platform. - /// On Linux and POSIX, this is the same as Handle. - pub const Id = switch (std.Target.current.os.tag) { - .windows => windows.DWORD, - else => Handle, - }; - - pub const Data = if (use_pthreads) - struct { - handle: Thread.Handle, - memory: []u8, - } - else switch (std.Target.current.os.tag) { - .linux => struct { - handle: Thread.Handle, - memory: []align(mem.page_size) u8, - }, - .windows => struct { - handle: Thread.Handle, - alloc_start: *c_void, - heap_handle: windows.HANDLE, - }, - else => struct {}, - }; - - /// Returns the ID of the calling thread. - /// Makes a syscall every time the function is called. - /// On Linux and POSIX, this Id is the same as a Handle. - pub fn getCurrentId() Id { - if (use_pthreads) { - return c.pthread_self(); - } else - return switch (std.Target.current.os.tag) { - .linux => os.linux.gettid(), - .windows => windows.kernel32.GetCurrentThreadId(), - else => @compileError("Unsupported OS"), - }; - } - - /// Returns the handle of this thread. - /// On Linux and POSIX, this is the same as Id. - /// On Linux, it is possible that the thread spawned with `spawn` - /// finishes executing entirely before the clone syscall completes. In this - /// case, this function will return 0 rather than the no-longer-existing thread's - /// pid. - pub fn handle(self: Thread) Handle { - return self.data.handle; - } - - pub fn wait(self: *Thread) void { - if (use_pthreads) { - const err = c.pthread_join(self.data.handle, null); - switch (err) { - 0 => {}, - os.EINVAL => unreachable, - os.ESRCH => unreachable, - os.EDEADLK => unreachable, - else => unreachable, - } - std.heap.c_allocator.free(self.data.memory); - std.heap.c_allocator.destroy(self); - } else switch (std.Target.current.os.tag) { - .linux => { - while (true) { - const pid_value = @atomicLoad(i32, &self.data.handle, .SeqCst); - if (pid_value == 0) break; - const rc = os.linux.futex_wait(&self.data.handle, os.linux.FUTEX_WAIT, pid_value, null); - switch (os.linux.getErrno(rc)) { - 0 => continue, - os.EINTR => continue, - os.EAGAIN => continue, - else => unreachable, - } - } - os.munmap(self.data.memory); - }, - .windows => { - windows.WaitForSingleObjectEx(self.data.handle, windows.INFINITE, false) catch unreachable; - windows.CloseHandle(self.data.handle); - windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start); - }, - else => @compileError("Unsupported OS"), - } - } - - pub const SpawnError = error{ - /// A system-imposed limit on the number of threads was encountered. - /// There are a number of limits that may trigger this error: - /// * the RLIMIT_NPROC soft resource limit (set via setrlimit(2)), - /// which limits the number of processes and threads for a real - /// user ID, was reached; - /// * the kernel's system-wide limit on the number of processes and - /// threads, /proc/sys/kernel/threads-max, was reached (see - /// proc(5)); - /// * the maximum number of PIDs, /proc/sys/kernel/pid_max, was - /// reached (see proc(5)); or - /// * the PID limit (pids.max) imposed by the cgroup "process num‐ - /// ber" (PIDs) controller was reached. - ThreadQuotaExceeded, - - /// The kernel cannot allocate sufficient memory to allocate a task structure - /// for the child, or to copy those parts of the caller's context that need to - /// be copied. - SystemResources, - - /// Not enough userland memory to spawn the thread. - OutOfMemory, - - /// `mlockall` is enabled, and the memory needed to spawn the thread - /// would exceed the limit. - LockedMemoryLimitExceeded, - - Unexpected, - }; - - /// caller must call wait on the returned thread - /// fn startFn(@TypeOf(context)) T - /// where T is u8, noreturn, void, or !void - /// caller must call wait on the returned thread - pub fn spawn(context: anytype, comptime startFn: anytype) SpawnError!*Thread { - if (builtin.single_threaded) @compileError("cannot spawn thread when building in single-threaded mode"); - // TODO compile-time call graph analysis to determine stack upper bound - // https://github.com/ziglang/zig/issues/157 - const default_stack_size = 16 * 1024 * 1024; - - const Context = @TypeOf(context); - comptime assert(@typeInfo(@TypeOf(startFn)).Fn.args[0].arg_type.? == Context); - - if (std.Target.current.os.tag == .windows) { - const WinThread = struct { - const OuterContext = struct { - thread: Thread, - inner: Context, - }; - fn threadMain(raw_arg: windows.LPVOID) callconv(.C) windows.DWORD { - const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*; - - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return 0; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - return startFn(arg); - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return 0; - }, - else => @compileError(bad_startfn_ret), - } - } - }; - - const heap_handle = windows.kernel32.GetProcessHeap() orelse return error.OutOfMemory; - const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext); - const bytes_ptr = windows.kernel32.HeapAlloc(heap_handle, 0, byte_count) orelse return error.OutOfMemory; - errdefer assert(windows.kernel32.HeapFree(heap_handle, 0, bytes_ptr) != 0); - const bytes = @ptrCast([*]u8, bytes_ptr)[0..byte_count]; - const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable; - outer_context.* = WinThread.OuterContext{ - .thread = Thread{ - .data = Thread.Data{ - .heap_handle = heap_handle, - .alloc_start = bytes_ptr, - .handle = undefined, - }, - }, - .inner = context, - }; - - const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(*c_void, &outer_context.inner); - outer_context.thread.data.handle = windows.kernel32.CreateThread(null, default_stack_size, WinThread.threadMain, parameter, 0, null) orelse { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } - }; - return &outer_context.thread; - } - - const MainFuncs = struct { - fn linuxThreadMain(ctx_addr: usize) callconv(.C) u8 { - const arg = if (@sizeOf(Context) == 0) {} else @intToPtr(*const Context, ctx_addr).*; - - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return 0; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - return startFn(arg); - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return 0; - }, - else => @compileError(bad_startfn_ret), - } - } - fn posixThreadMain(ctx: ?*c_void) callconv(.C) ?*c_void { - const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), ctx)).*; - - switch (@typeInfo(@typeInfo(@TypeOf(startFn)).Fn.return_type.?)) { - .NoReturn => { - startFn(arg); - }, - .Void => { - startFn(arg); - return null; - }, - .Int => |info| { - if (info.bits != 8) { - @compileError(bad_startfn_ret); - } - // pthreads don't support exit status, ignore value - _ = startFn(arg); - return null; - }, - .ErrorUnion => |info| { - if (info.payload != void) { - @compileError(bad_startfn_ret); - } - startFn(arg) catch |err| { - std.debug.warn("error: {s}\n", .{@errorName(err)}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - return null; - }, - else => @compileError(bad_startfn_ret), - } - } - }; - - if (Thread.use_pthreads) { - var attr: c.pthread_attr_t = undefined; - if (c.pthread_attr_init(&attr) != 0) return error.SystemResources; - defer assert(c.pthread_attr_destroy(&attr) == 0); - - const thread_obj = try std.heap.c_allocator.create(Thread); - errdefer std.heap.c_allocator.destroy(thread_obj); - if (@sizeOf(Context) > 0) { - thread_obj.data.memory = try std.heap.c_allocator.allocAdvanced( - u8, - @alignOf(Context), - @sizeOf(Context), - .at_least, - ); - errdefer std.heap.c_allocator.free(thread_obj.data.memory); - mem.copy(u8, thread_obj.data.memory, mem.asBytes(&context)); - } else { - thread_obj.data.memory = @as([*]u8, undefined)[0..0]; - } - - // Use the same set of parameters used by the libc-less impl. - assert(c.pthread_attr_setstacksize(&attr, default_stack_size) == 0); - assert(c.pthread_attr_setguardsize(&attr, mem.page_size) == 0); - - const err = c.pthread_create( - &thread_obj.data.handle, - &attr, - MainFuncs.posixThreadMain, - thread_obj.data.memory.ptr, - ); - switch (err) { - 0 => return thread_obj, - os.EAGAIN => return error.SystemResources, - os.EPERM => unreachable, - os.EINVAL => unreachable, - else => return os.unexpectedErrno(@intCast(usize, err)), - } - - return thread_obj; - } - - var guard_end_offset: usize = undefined; - var stack_end_offset: usize = undefined; - var thread_start_offset: usize = undefined; - var context_start_offset: usize = undefined; - var tls_start_offset: usize = undefined; - const mmap_len = blk: { - var l: usize = mem.page_size; - // Allocate a guard page right after the end of the stack region - guard_end_offset = l; - // The stack itself, which grows downwards. - l = mem.alignForward(l + default_stack_size, mem.page_size); - stack_end_offset = l; - // Above the stack, so that it can be in the same mmap call, put the Thread object. - l = mem.alignForward(l, @alignOf(Thread)); - thread_start_offset = l; - l += @sizeOf(Thread); - // Next, the Context object. - if (@sizeOf(Context) != 0) { - l = mem.alignForward(l, @alignOf(Context)); - context_start_offset = l; - l += @sizeOf(Context); - } - // Finally, the Thread Local Storage, if any. - l = mem.alignForward(l, os.linux.tls.tls_image.alloc_align); - tls_start_offset = l; - l += os.linux.tls.tls_image.alloc_size; - // Round the size to the page size. - break :blk mem.alignForward(l, mem.page_size); - }; - - const mmap_slice = mem: { - // Map the whole stack with no rw permissions to avoid - // committing the whole region right away - const mmap_slice = os.mmap( - null, - mmap_len, - os.PROT_NONE, - os.MAP_PRIVATE | os.MAP_ANONYMOUS, - -1, - 0, - ) catch |err| switch (err) { - error.MemoryMappingNotSupported => unreachable, - error.AccessDenied => unreachable, - error.PermissionDenied => unreachable, - else => |e| return e, - }; - errdefer os.munmap(mmap_slice); - - // Map everything but the guard page as rw - os.mprotect( - mmap_slice[guard_end_offset..], - os.PROT_READ | os.PROT_WRITE, - ) catch |err| switch (err) { - error.AccessDenied => unreachable, - else => |e| return e, - }; - - break :mem mmap_slice; - }; - - const mmap_addr = @ptrToInt(mmap_slice.ptr); - - const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(*Thread, mmap_addr + thread_start_offset)); - thread_ptr.data.memory = mmap_slice; - - var arg: usize = undefined; - if (@sizeOf(Context) != 0) { - arg = mmap_addr + context_start_offset; - const context_ptr = @alignCast(@alignOf(Context), @intToPtr(*Context, arg)); - context_ptr.* = context; - } - - if (std.Target.current.os.tag == .linux) { - const flags: u32 = os.CLONE_VM | os.CLONE_FS | os.CLONE_FILES | - os.CLONE_SIGHAND | os.CLONE_THREAD | os.CLONE_SYSVSEM | - os.CLONE_PARENT_SETTID | os.CLONE_CHILD_CLEARTID | - os.CLONE_DETACHED | os.CLONE_SETTLS; - // This structure is only needed when targeting i386 - var user_desc: if (std.Target.current.cpu.arch == .i386) os.linux.user_desc else void = undefined; - - const tls_area = mmap_slice[tls_start_offset..]; - const tp_value = os.linux.tls.prepareTLS(tls_area); - - const newtls = blk: { - if (std.Target.current.cpu.arch == .i386) { - user_desc = os.linux.user_desc{ - .entry_number = os.linux.tls.tls_image.gdt_entry_number, - .base_addr = tp_value, - .limit = 0xfffff, - .seg_32bit = 1, - .contents = 0, // Data - .read_exec_only = 0, - .limit_in_pages = 1, - .seg_not_present = 0, - .useable = 1, - }; - break :blk @ptrToInt(&user_desc); - } else { - break :blk tp_value; - } - }; - - const rc = os.linux.clone( - MainFuncs.linuxThreadMain, - mmap_addr + stack_end_offset, - flags, - arg, - &thread_ptr.data.handle, - newtls, - &thread_ptr.data.handle, - ); - switch (os.errno(rc)) { - 0 => return thread_ptr, - os.EAGAIN => return error.ThreadQuotaExceeded, - os.EINVAL => unreachable, - os.ENOMEM => return error.SystemResources, - os.ENOSPC => unreachable, - os.EPERM => unreachable, - os.EUSERS => unreachable, - else => |err| return os.unexpectedErrno(err), - } - } else { - @compileError("Unsupported OS"); - } - } - - pub const CpuCountError = error{ - PermissionDenied, - SystemResources, - Unexpected, - }; - - pub fn cpuCount() CpuCountError!usize { - if (std.Target.current.os.tag == .linux) { - const cpu_set = try os.sched_getaffinity(0); - return @as(usize, os.CPU_COUNT(cpu_set)); // TODO should not need this usize cast - } - if (std.Target.current.os.tag == .windows) { - return os.windows.peb().NumberOfProcessors; - } - if (std.Target.current.os.tag == .openbsd) { - var count: c_int = undefined; - var count_size: usize = @sizeOf(c_int); - const mib = [_]c_int{ os.CTL_HW, os.HW_NCPUONLINE }; - os.sysctl(&mib, &count, &count_size, null, 0) catch |err| switch (err) { - error.NameTooLong, error.UnknownName => unreachable, - else => |e| return e, - }; - return @intCast(usize, count); - } - var count: c_int = undefined; - var count_len: usize = @sizeOf(c_int); - const name = if (comptime std.Target.current.isDarwin()) "hw.logicalcpu" else "hw.ncpu"; - os.sysctlbynameZ(name, &count, &count_len, null, 0) catch |err| switch (err) { - error.NameTooLong, error.UnknownName => unreachable, - else => |e| return e, - }; - return @intCast(usize, count); - } - - pub fn getCurrentThreadId() u64 { - switch (std.Target.current.os.tag) { - .linux => { - // Use the syscall directly as musl doesn't provide a wrapper. - return @bitCast(u32, os.linux.gettid()); - }, - .windows => { - return os.windows.kernel32.GetCurrentThreadId(); - }, - .macos, .ios, .watchos, .tvos => { - var thread_id: u64 = undefined; - // Pass thread=null to get the current thread ID. - assert(c.pthread_threadid_np(null, &thread_id) == 0); - return thread_id; - }, - .netbsd => { - return @bitCast(u32, c._lwp_self()); - }, - .freebsd => { - return @bitCast(u32, c.pthread_getthreadid_np()); - }, - .openbsd => { - return @bitCast(u32, c.getthrid()); - }, - else => { - @compileError("getCurrentThreadId not implemented for this platform"); - }, - } - } -}; |
