diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2024-03-18 22:39:59 -0700 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2024-03-19 11:45:09 -0700 |
| commit | cd62005f19ff966d2c42de4daeb9a1e4b644bf76 (patch) | |
| tree | 4bb316708afaf79c971808df792d8fe86274789b /lib/std/Thread | |
| parent | 7057bffc14602add697eb566b83934b7ad3fd81c (diff) | |
| download | zig-cd62005f19ff966d2c42de4daeb9a1e4b644bf76.tar.gz zig-cd62005f19ff966d2c42de4daeb9a1e4b644bf76.zip | |
extract std.posix from std.os
closes #5019
Diffstat (limited to 'lib/std/Thread')
| -rw-r--r-- | lib/std/Thread/Futex.zig | 135 | ||||
| -rw-r--r-- | lib/std/Thread/Mutex.zig | 21 |
2 files changed, 83 insertions, 73 deletions
diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig index 764d3f13e1..2a81a4a535 100644 --- a/lib/std/Thread/Futex.zig +++ b/lib/std/Thread/Futex.zig @@ -1,13 +1,20 @@ -//! Futex is a mechanism used to block (`wait`) and unblock (`wake`) threads using a 32bit memory address as hints. -//! Blocking a thread is acknowledged only if the 32bit memory address is equal to a given value. -//! This check helps avoid block/unblock deadlocks which occur if a `wake()` happens before a `wait()`. -//! Using Futex, other Thread synchronization primitives can be built which efficiently wait for cross-thread events or signals. +//! A mechanism used to block (`wait`) and unblock (`wake`) threads using a +//! 32bit memory address as hints. +//! +//! Blocking a thread is acknowledged only if the 32bit memory address is equal +//! to a given value. This check helps avoid block/unblock deadlocks which +//! occur if a `wake()` happens before a `wait()`. +//! +//! Using Futex, other Thread synchronization primitives can be built which +//! efficiently wait for cross-thread events or signals. const std = @import("../std.zig"); const builtin = @import("builtin"); const Futex = @This(); +const windows = std.os.windows; +const linux = std.os.linux; +const c = std.c; -const os = std.os; const assert = std.debug.assert; const testing = std.testing; const atomic = std.atomic; @@ -124,18 +131,18 @@ const SingleThreadedImpl = struct { // as it's generally already a linked target and is autoloaded into all processes anyway. const WindowsImpl = struct { fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { - var timeout_value: os.windows.LARGE_INTEGER = undefined; - var timeout_ptr: ?*const os.windows.LARGE_INTEGER = null; + var timeout_value: windows.LARGE_INTEGER = undefined; + var timeout_ptr: ?*const windows.LARGE_INTEGER = null; // NTDLL functions work with time in units of 100 nanoseconds. // Positive values are absolute deadlines while negative values are relative durations. if (timeout) |delay| { - timeout_value = @as(os.windows.LARGE_INTEGER, @intCast(delay / 100)); + timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100)); timeout_value = -timeout_value; timeout_ptr = &timeout_value; } - const rc = os.windows.ntdll.RtlWaitOnAddress( + const rc = windows.ntdll.RtlWaitOnAddress( ptr, &expect, @sizeOf(@TypeOf(expect)), @@ -157,8 +164,8 @@ const WindowsImpl = struct { assert(max_waiters != 0); switch (max_waiters) { - 1 => os.windows.ntdll.RtlWakeAddressSingle(address), - else => os.windows.ntdll.RtlWakeAddressAll(address), + 1 => windows.ntdll.RtlWakeAddressSingle(address), + else => windows.ntdll.RtlWakeAddressAll(address), } } }; @@ -189,10 +196,10 @@ const DarwinImpl = struct { var timeout_overflowed = false; const addr: *const anyopaque = ptr; - const flags = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO; + const flags = c.UL_COMPARE_AND_WAIT | c.ULF_NO_ERRNO; const status = blk: { if (supports_ulock_wait2) { - break :blk os.darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0); + break :blk c.__ulock_wait2(flags, addr, expect, timeout_ns, 0); } const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) orelse overflow: { @@ -200,11 +207,11 @@ const DarwinImpl = struct { break :overflow std.math.maxInt(u32); }; - break :blk os.darwin.__ulock_wait(flags, addr, expect, timeout_us); + break :blk c.__ulock_wait(flags, addr, expect, timeout_us); }; if (status >= 0) return; - switch (@as(std.os.E, @enumFromInt(-status))) { + switch (@as(c.E, @enumFromInt(-status))) { // Wait was interrupted by the OS or other spurious signalling. .INTR => {}, // Address of the futex was paged out. This is unlikely, but possible in theory, and @@ -221,17 +228,17 @@ const DarwinImpl = struct { } fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { - var flags: u32 = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO; + var flags: u32 = c.UL_COMPARE_AND_WAIT | c.ULF_NO_ERRNO; if (max_waiters > 1) { - flags |= os.darwin.ULF_WAKE_ALL; + flags |= c.ULF_WAKE_ALL; } while (true) { const addr: *const anyopaque = ptr; - const status = os.darwin.__ulock_wake(flags, addr, 0); + const status = c.__ulock_wake(flags, addr, 0); if (status >= 0) return; - switch (@as(std.os.E, @enumFromInt(-status))) { + switch (@as(c.E, @enumFromInt(-status))) { .INTR => continue, // spurious wake() .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t .NOENT => return, // nothing was woken up @@ -245,20 +252,20 @@ const DarwinImpl = struct { // https://man7.org/linux/man-pages/man2/futex.2.html const LinuxImpl = struct { fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { - var ts: os.timespec = undefined; + var ts: linux.timespec = undefined; if (timeout) |timeout_ns| { ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s)); ts.tv_nsec = @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s)); } - const rc = os.linux.futex_wait( + const rc = linux.futex_wait( @as(*const i32, @ptrCast(&ptr.raw)), - os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAIT, + linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT, @as(i32, @bitCast(expect)), if (timeout != null) &ts else null, ); - switch (os.linux.getErrno(rc)) { + switch (linux.E.init(rc)) { .SUCCESS => {}, // notified by `wake()` .INTR => {}, // spurious wakeup .AGAIN => {}, // ptr.* != expect @@ -273,13 +280,13 @@ const LinuxImpl = struct { } fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { - const rc = os.linux.futex_wake( + const rc = linux.futex_wake( @as(*const i32, @ptrCast(&ptr.raw)), - os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAKE, + linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE, std.math.cast(i32, max_waiters) orelse std.math.maxInt(i32), ); - switch (os.linux.getErrno(rc)) { + switch (linux.E.init(rc)) { .SUCCESS => {}, // successful wake up .INVAL => {}, // invalid futex_wait() on ptr done elsewhere .FAULT => {}, // pointer became invalid while doing the wake @@ -292,28 +299,28 @@ const LinuxImpl = struct { const FreebsdImpl = struct { fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { var tm_size: usize = 0; - var tm: os.freebsd._umtx_time = undefined; - var tm_ptr: ?*const os.freebsd._umtx_time = null; + var tm: c._umtx_time = undefined; + var tm_ptr: ?*const c._umtx_time = null; if (timeout) |timeout_ns| { tm_ptr = &tm; tm_size = @sizeOf(@TypeOf(tm)); tm._flags = 0; // use relative time not UMTX_ABSTIME - tm._clockid = os.CLOCK.MONOTONIC; + tm._clockid = c.CLOCK.MONOTONIC; tm._timeout.tv_sec = @as(@TypeOf(tm._timeout.tv_sec), @intCast(timeout_ns / std.time.ns_per_s)); tm._timeout.tv_nsec = @as(@TypeOf(tm._timeout.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s)); } - const rc = os.freebsd._umtx_op( + const rc = c._umtx_op( @intFromPtr(&ptr.raw), - @intFromEnum(os.freebsd.UMTX_OP.WAIT_UINT_PRIVATE), + @intFromEnum(c.UMTX_OP.WAIT_UINT_PRIVATE), @as(c_ulong, expect), tm_size, @intFromPtr(tm_ptr), ); - switch (os.errno(rc)) { + switch (std.posix.errno(rc)) { .SUCCESS => {}, .FAULT => unreachable, // one of the args points to invalid memory .INVAL => unreachable, // arguments should be correct @@ -327,15 +334,15 @@ const FreebsdImpl = struct { } fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { - const rc = os.freebsd._umtx_op( + const rc = c._umtx_op( @intFromPtr(&ptr.raw), - @intFromEnum(os.freebsd.UMTX_OP.WAKE_PRIVATE), + @intFromEnum(c.UMTX_OP.WAKE_PRIVATE), @as(c_ulong, max_waiters), 0, // there is no timeout struct 0, // there is no timeout struct pointer ); - switch (os.errno(rc)) { + switch (std.posix.errno(rc)) { .SUCCESS => {}, .FAULT => {}, // it's ok if the ptr doesn't point to valid memory .INVAL => unreachable, // arguments should be correct @@ -347,21 +354,21 @@ const FreebsdImpl = struct { // https://man.openbsd.org/futex.2 const OpenbsdImpl = struct { fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void { - var ts: os.timespec = undefined; + var ts: c.timespec = undefined; if (timeout) |timeout_ns| { ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s)); ts.tv_nsec = @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s)); } - const rc = os.openbsd.futex( + const rc = c.futex( @as(*const volatile u32, @ptrCast(&ptr.raw)), - os.openbsd.FUTEX_WAIT | os.openbsd.FUTEX_PRIVATE_FLAG, + c.FUTEX_WAIT | c.FUTEX_PRIVATE_FLAG, @as(c_int, @bitCast(expect)), if (timeout != null) &ts else null, null, // FUTEX_WAIT takes no requeue address ); - switch (os.errno(rc)) { + switch (std.posix.errno(rc)) { .SUCCESS => {}, // woken up by wake .NOSYS => unreachable, // the futex operation shouldn't be invalid .FAULT => unreachable, // ptr was invalid @@ -378,9 +385,9 @@ const OpenbsdImpl = struct { } fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void { - const rc = os.openbsd.futex( + const rc = c.futex( @as(*const volatile u32, @ptrCast(&ptr.raw)), - os.openbsd.FUTEX_WAKE | os.openbsd.FUTEX_PRIVATE_FLAG, + c.FUTEX_WAKE | c.FUTEX_PRIVATE_FLAG, std.math.cast(c_int, max_waiters) orelse std.math.maxInt(c_int), null, // FUTEX_WAKE takes no timeout ptr null, // FUTEX_WAKE takes no requeue address @@ -415,9 +422,9 @@ const DragonflyImpl = struct { const value = @as(c_int, @bitCast(expect)); const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw)); - const rc = os.dragonfly.umtx_sleep(addr, value, timeout_us); + const rc = c.umtx_sleep(addr, value, timeout_us); - switch (os.errno(rc)) { + switch (std.posix.errno(rc)) { .SUCCESS => {}, .BUSY => {}, // ptr != expect .AGAIN => { // maybe timed out, or paged out, or hit 2s kernel refresh @@ -444,7 +451,7 @@ const DragonflyImpl = struct { // > umtx_wakeup() will generally return 0 unless the address is bad. // We are fine with the address being bad (e.g. for Semaphore.post() where Semaphore.wait() frees the Semaphore) const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw)); - _ = os.dragonfly.umtx_wakeup(addr, to_wake); + _ = c.umtx_wakeup(addr, to_wake); } }; @@ -496,8 +503,8 @@ const WasmImpl = struct { /// https://go.dev/src/runtime/sema.go const PosixImpl = struct { const Event = struct { - cond: std.c.pthread_cond_t, - mutex: std.c.pthread_mutex_t, + cond: c.pthread_cond_t, + mutex: c.pthread_mutex_t, state: enum { empty, waiting, notified }, fn init(self: *Event) void { @@ -509,18 +516,18 @@ const PosixImpl = struct { fn deinit(self: *Event) void { // Some platforms reportedly give EINVAL for statically initialized pthread types. - const rc = std.c.pthread_cond_destroy(&self.cond); + const rc = c.pthread_cond_destroy(&self.cond); assert(rc == .SUCCESS or rc == .INVAL); - const rm = std.c.pthread_mutex_destroy(&self.mutex); + const rm = c.pthread_mutex_destroy(&self.mutex); assert(rm == .SUCCESS or rm == .INVAL); self.* = undefined; } fn wait(self: *Event, timeout: ?u64) error{Timeout}!void { - assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); + assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); // Early return if the event was already set. if (self.state == .notified) { @@ -530,9 +537,9 @@ const PosixImpl = struct { // Compute the absolute timeout if one was specified. // POSIX requires that REALTIME is used by default for the pthread timedwait functions. // This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere. - var ts: os.timespec = undefined; + var ts: c.timespec = undefined; if (timeout) |timeout_ns| { - os.clock_gettime(os.CLOCK.REALTIME, &ts) catch unreachable; + c.clock_gettime(c.CLOCK.REALTIME, &ts) catch unreachable; ts.tv_sec +|= @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s)); ts.tv_nsec += @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s)); @@ -549,8 +556,8 @@ const PosixImpl = struct { while (true) { // Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout. const rc = blk: { - if (timeout == null) break :blk std.c.pthread_cond_wait(&self.cond, &self.mutex); - break :blk std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts); + if (timeout == null) break :blk c.pthread_cond_wait(&self.cond, &self.mutex); + break :blk c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts); }; // After waking up, check if the event was set. @@ -574,8 +581,8 @@ const PosixImpl = struct { } fn set(self: *Event) void { - assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); + assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS); // Make sure that multiple calls to set() were not done on the same Event. const old_state = self.state; @@ -586,7 +593,7 @@ const PosixImpl = struct { // the condition variable once it observes the new state, potentially causing a UAF if done unlocked. self.state = .notified; if (old_state == .waiting) { - assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS); + assert(c.pthread_cond_signal(&self.cond) == .SUCCESS); } } }; @@ -732,7 +739,7 @@ const PosixImpl = struct { }; const Bucket = struct { - mutex: std.c.pthread_mutex_t align(atomic.cache_line) = .{}, + mutex: c.pthread_mutex_t align(atomic.cache_line) = .{}, pending: atomic.Value(usize) = atomic.Value(usize).init(0), treap: Treap = .{}, @@ -798,8 +805,8 @@ const PosixImpl = struct { var waiter: Waiter = undefined; { - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); cancelled = ptr.load(.monotonic) != expect; if (cancelled) { @@ -821,8 +828,8 @@ const PosixImpl = struct { // If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF. defer if (!cancelled) waiter.event.wait(null) catch unreachable; - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); cancelled = WaitQueue.tryRemove(&bucket.treap, address, &waiter); if (cancelled) { @@ -871,8 +878,8 @@ const PosixImpl = struct { } }; - assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); - defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); + assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS); + defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS); // Another pending check again to avoid the WaitQueue lookup if not necessary. if (bucket.pending.load(.monotonic) > 0) { diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig index 67472ffd9c..b6d3d6fb84 100644 --- a/lib/std/Thread/Mutex.zig +++ b/lib/std/Thread/Mutex.zig @@ -23,7 +23,6 @@ const std = @import("../std.zig"); const builtin = @import("builtin"); const Mutex = @This(); -const os = std.os; const assert = std.debug.assert; const testing = std.testing; const Thread = std.Thread; @@ -117,36 +116,40 @@ const SingleThreadedImpl = struct { // SRWLOCK on windows is almost always faster than Futex solution. // It also implements an efficient Condition with requeue support for us. const WindowsImpl = struct { - srwlock: os.windows.SRWLOCK = .{}, + srwlock: windows.SRWLOCK = .{}, fn tryLock(self: *@This()) bool { - return os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != os.windows.FALSE; + return windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != windows.FALSE; } fn lock(self: *@This()) void { - os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock); + windows.kernel32.AcquireSRWLockExclusive(&self.srwlock); } fn unlock(self: *@This()) void { - os.windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock); + windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock); } + + const windows = std.os.windows; }; // os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions. const DarwinImpl = struct { - oul: os.darwin.os_unfair_lock = .{}, + oul: c.os_unfair_lock = .{}, fn tryLock(self: *@This()) bool { - return os.darwin.os_unfair_lock_trylock(&self.oul); + return c.os_unfair_lock_trylock(&self.oul); } fn lock(self: *@This()) void { - os.darwin.os_unfair_lock_lock(&self.oul); + c.os_unfair_lock_lock(&self.oul); } fn unlock(self: *@This()) void { - os.darwin.os_unfair_lock_unlock(&self.oul); + c.os_unfair_lock_unlock(&self.oul); } + + const c = std.c; }; const FutexImpl = struct { |
