aboutsummaryrefslogtreecommitdiff
path: root/lib/std/Thread
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2024-03-18 22:39:59 -0700
committerAndrew Kelley <andrew@ziglang.org>2024-03-19 11:45:09 -0700
commitcd62005f19ff966d2c42de4daeb9a1e4b644bf76 (patch)
tree4bb316708afaf79c971808df792d8fe86274789b /lib/std/Thread
parent7057bffc14602add697eb566b83934b7ad3fd81c (diff)
downloadzig-cd62005f19ff966d2c42de4daeb9a1e4b644bf76.tar.gz
zig-cd62005f19ff966d2c42de4daeb9a1e4b644bf76.zip
extract std.posix from std.os
closes #5019
Diffstat (limited to 'lib/std/Thread')
-rw-r--r--lib/std/Thread/Futex.zig135
-rw-r--r--lib/std/Thread/Mutex.zig21
2 files changed, 83 insertions, 73 deletions
diff --git a/lib/std/Thread/Futex.zig b/lib/std/Thread/Futex.zig
index 764d3f13e1..2a81a4a535 100644
--- a/lib/std/Thread/Futex.zig
+++ b/lib/std/Thread/Futex.zig
@@ -1,13 +1,20 @@
-//! Futex is a mechanism used to block (`wait`) and unblock (`wake`) threads using a 32bit memory address as hints.
-//! Blocking a thread is acknowledged only if the 32bit memory address is equal to a given value.
-//! This check helps avoid block/unblock deadlocks which occur if a `wake()` happens before a `wait()`.
-//! Using Futex, other Thread synchronization primitives can be built which efficiently wait for cross-thread events or signals.
+//! A mechanism used to block (`wait`) and unblock (`wake`) threads using a
+//! 32bit memory address as hints.
+//!
+//! Blocking a thread is acknowledged only if the 32bit memory address is equal
+//! to a given value. This check helps avoid block/unblock deadlocks which
+//! occur if a `wake()` happens before a `wait()`.
+//!
+//! Using Futex, other Thread synchronization primitives can be built which
+//! efficiently wait for cross-thread events or signals.
const std = @import("../std.zig");
const builtin = @import("builtin");
const Futex = @This();
+const windows = std.os.windows;
+const linux = std.os.linux;
+const c = std.c;
-const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const atomic = std.atomic;
@@ -124,18 +131,18 @@ const SingleThreadedImpl = struct {
// as it's generally already a linked target and is autoloaded into all processes anyway.
const WindowsImpl = struct {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- var timeout_value: os.windows.LARGE_INTEGER = undefined;
- var timeout_ptr: ?*const os.windows.LARGE_INTEGER = null;
+ var timeout_value: windows.LARGE_INTEGER = undefined;
+ var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
// NTDLL functions work with time in units of 100 nanoseconds.
// Positive values are absolute deadlines while negative values are relative durations.
if (timeout) |delay| {
- timeout_value = @as(os.windows.LARGE_INTEGER, @intCast(delay / 100));
+ timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100));
timeout_value = -timeout_value;
timeout_ptr = &timeout_value;
}
- const rc = os.windows.ntdll.RtlWaitOnAddress(
+ const rc = windows.ntdll.RtlWaitOnAddress(
ptr,
&expect,
@sizeOf(@TypeOf(expect)),
@@ -157,8 +164,8 @@ const WindowsImpl = struct {
assert(max_waiters != 0);
switch (max_waiters) {
- 1 => os.windows.ntdll.RtlWakeAddressSingle(address),
- else => os.windows.ntdll.RtlWakeAddressAll(address),
+ 1 => windows.ntdll.RtlWakeAddressSingle(address),
+ else => windows.ntdll.RtlWakeAddressAll(address),
}
}
};
@@ -189,10 +196,10 @@ const DarwinImpl = struct {
var timeout_overflowed = false;
const addr: *const anyopaque = ptr;
- const flags = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO;
+ const flags = c.UL_COMPARE_AND_WAIT | c.ULF_NO_ERRNO;
const status = blk: {
if (supports_ulock_wait2) {
- break :blk os.darwin.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
+ break :blk c.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
}
const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) orelse overflow: {
@@ -200,11 +207,11 @@ const DarwinImpl = struct {
break :overflow std.math.maxInt(u32);
};
- break :blk os.darwin.__ulock_wait(flags, addr, expect, timeout_us);
+ break :blk c.__ulock_wait(flags, addr, expect, timeout_us);
};
if (status >= 0) return;
- switch (@as(std.os.E, @enumFromInt(-status))) {
+ switch (@as(c.E, @enumFromInt(-status))) {
// Wait was interrupted by the OS or other spurious signalling.
.INTR => {},
// Address of the futex was paged out. This is unlikely, but possible in theory, and
@@ -221,17 +228,17 @@ const DarwinImpl = struct {
}
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- var flags: u32 = os.darwin.UL_COMPARE_AND_WAIT | os.darwin.ULF_NO_ERRNO;
+ var flags: u32 = c.UL_COMPARE_AND_WAIT | c.ULF_NO_ERRNO;
if (max_waiters > 1) {
- flags |= os.darwin.ULF_WAKE_ALL;
+ flags |= c.ULF_WAKE_ALL;
}
while (true) {
const addr: *const anyopaque = ptr;
- const status = os.darwin.__ulock_wake(flags, addr, 0);
+ const status = c.__ulock_wake(flags, addr, 0);
if (status >= 0) return;
- switch (@as(std.os.E, @enumFromInt(-status))) {
+ switch (@as(c.E, @enumFromInt(-status))) {
.INTR => continue, // spurious wake()
.FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
.NOENT => return, // nothing was woken up
@@ -245,20 +252,20 @@ const DarwinImpl = struct {
// https://man7.org/linux/man-pages/man2/futex.2.html
const LinuxImpl = struct {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- var ts: os.timespec = undefined;
+ var ts: linux.timespec = undefined;
if (timeout) |timeout_ns| {
ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
ts.tv_nsec = @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s));
}
- const rc = os.linux.futex_wait(
+ const rc = linux.futex_wait(
@as(*const i32, @ptrCast(&ptr.raw)),
- os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAIT,
+ linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAIT,
@as(i32, @bitCast(expect)),
if (timeout != null) &ts else null,
);
- switch (os.linux.getErrno(rc)) {
+ switch (linux.E.init(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // spurious wakeup
.AGAIN => {}, // ptr.* != expect
@@ -273,13 +280,13 @@ const LinuxImpl = struct {
}
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const rc = os.linux.futex_wake(
+ const rc = linux.futex_wake(
@as(*const i32, @ptrCast(&ptr.raw)),
- os.linux.FUTEX.PRIVATE_FLAG | os.linux.FUTEX.WAKE,
+ linux.FUTEX.PRIVATE_FLAG | linux.FUTEX.WAKE,
std.math.cast(i32, max_waiters) orelse std.math.maxInt(i32),
);
- switch (os.linux.getErrno(rc)) {
+ switch (linux.E.init(rc)) {
.SUCCESS => {}, // successful wake up
.INVAL => {}, // invalid futex_wait() on ptr done elsewhere
.FAULT => {}, // pointer became invalid while doing the wake
@@ -292,28 +299,28 @@ const LinuxImpl = struct {
const FreebsdImpl = struct {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
var tm_size: usize = 0;
- var tm: os.freebsd._umtx_time = undefined;
- var tm_ptr: ?*const os.freebsd._umtx_time = null;
+ var tm: c._umtx_time = undefined;
+ var tm_ptr: ?*const c._umtx_time = null;
if (timeout) |timeout_ns| {
tm_ptr = &tm;
tm_size = @sizeOf(@TypeOf(tm));
tm._flags = 0; // use relative time not UMTX_ABSTIME
- tm._clockid = os.CLOCK.MONOTONIC;
+ tm._clockid = c.CLOCK.MONOTONIC;
tm._timeout.tv_sec = @as(@TypeOf(tm._timeout.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
tm._timeout.tv_nsec = @as(@TypeOf(tm._timeout.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s));
}
- const rc = os.freebsd._umtx_op(
+ const rc = c._umtx_op(
@intFromPtr(&ptr.raw),
- @intFromEnum(os.freebsd.UMTX_OP.WAIT_UINT_PRIVATE),
+ @intFromEnum(c.UMTX_OP.WAIT_UINT_PRIVATE),
@as(c_ulong, expect),
tm_size,
@intFromPtr(tm_ptr),
);
- switch (os.errno(rc)) {
+ switch (std.posix.errno(rc)) {
.SUCCESS => {},
.FAULT => unreachable, // one of the args points to invalid memory
.INVAL => unreachable, // arguments should be correct
@@ -327,15 +334,15 @@ const FreebsdImpl = struct {
}
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const rc = os.freebsd._umtx_op(
+ const rc = c._umtx_op(
@intFromPtr(&ptr.raw),
- @intFromEnum(os.freebsd.UMTX_OP.WAKE_PRIVATE),
+ @intFromEnum(c.UMTX_OP.WAKE_PRIVATE),
@as(c_ulong, max_waiters),
0, // there is no timeout struct
0, // there is no timeout struct pointer
);
- switch (os.errno(rc)) {
+ switch (std.posix.errno(rc)) {
.SUCCESS => {},
.FAULT => {}, // it's ok if the ptr doesn't point to valid memory
.INVAL => unreachable, // arguments should be correct
@@ -347,21 +354,21 @@ const FreebsdImpl = struct {
// https://man.openbsd.org/futex.2
const OpenbsdImpl = struct {
fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
- var ts: os.timespec = undefined;
+ var ts: c.timespec = undefined;
if (timeout) |timeout_ns| {
ts.tv_sec = @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
ts.tv_nsec = @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s));
}
- const rc = os.openbsd.futex(
+ const rc = c.futex(
@as(*const volatile u32, @ptrCast(&ptr.raw)),
- os.openbsd.FUTEX_WAIT | os.openbsd.FUTEX_PRIVATE_FLAG,
+ c.FUTEX_WAIT | c.FUTEX_PRIVATE_FLAG,
@as(c_int, @bitCast(expect)),
if (timeout != null) &ts else null,
null, // FUTEX_WAIT takes no requeue address
);
- switch (os.errno(rc)) {
+ switch (std.posix.errno(rc)) {
.SUCCESS => {}, // woken up by wake
.NOSYS => unreachable, // the futex operation shouldn't be invalid
.FAULT => unreachable, // ptr was invalid
@@ -378,9 +385,9 @@ const OpenbsdImpl = struct {
}
fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
- const rc = os.openbsd.futex(
+ const rc = c.futex(
@as(*const volatile u32, @ptrCast(&ptr.raw)),
- os.openbsd.FUTEX_WAKE | os.openbsd.FUTEX_PRIVATE_FLAG,
+ c.FUTEX_WAKE | c.FUTEX_PRIVATE_FLAG,
std.math.cast(c_int, max_waiters) orelse std.math.maxInt(c_int),
null, // FUTEX_WAKE takes no timeout ptr
null, // FUTEX_WAKE takes no requeue address
@@ -415,9 +422,9 @@ const DragonflyImpl = struct {
const value = @as(c_int, @bitCast(expect));
const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
- const rc = os.dragonfly.umtx_sleep(addr, value, timeout_us);
+ const rc = c.umtx_sleep(addr, value, timeout_us);
- switch (os.errno(rc)) {
+ switch (std.posix.errno(rc)) {
.SUCCESS => {},
.BUSY => {}, // ptr != expect
.AGAIN => { // maybe timed out, or paged out, or hit 2s kernel refresh
@@ -444,7 +451,7 @@ const DragonflyImpl = struct {
// > umtx_wakeup() will generally return 0 unless the address is bad.
// We are fine with the address being bad (e.g. for Semaphore.post() where Semaphore.wait() frees the Semaphore)
const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
- _ = os.dragonfly.umtx_wakeup(addr, to_wake);
+ _ = c.umtx_wakeup(addr, to_wake);
}
};
@@ -496,8 +503,8 @@ const WasmImpl = struct {
/// https://go.dev/src/runtime/sema.go
const PosixImpl = struct {
const Event = struct {
- cond: std.c.pthread_cond_t,
- mutex: std.c.pthread_mutex_t,
+ cond: c.pthread_cond_t,
+ mutex: c.pthread_mutex_t,
state: enum { empty, waiting, notified },
fn init(self: *Event) void {
@@ -509,18 +516,18 @@ const PosixImpl = struct {
fn deinit(self: *Event) void {
// Some platforms reportedly give EINVAL for statically initialized pthread types.
- const rc = std.c.pthread_cond_destroy(&self.cond);
+ const rc = c.pthread_cond_destroy(&self.cond);
assert(rc == .SUCCESS or rc == .INVAL);
- const rm = std.c.pthread_mutex_destroy(&self.mutex);
+ const rm = c.pthread_mutex_destroy(&self.mutex);
assert(rm == .SUCCESS or rm == .INVAL);
self.* = undefined;
}
fn wait(self: *Event, timeout: ?u64) error{Timeout}!void {
- assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
+ assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
+ defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
// Early return if the event was already set.
if (self.state == .notified) {
@@ -530,9 +537,9 @@ const PosixImpl = struct {
// Compute the absolute timeout if one was specified.
// POSIX requires that REALTIME is used by default for the pthread timedwait functions.
// This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere.
- var ts: os.timespec = undefined;
+ var ts: c.timespec = undefined;
if (timeout) |timeout_ns| {
- os.clock_gettime(os.CLOCK.REALTIME, &ts) catch unreachable;
+ c.clock_gettime(c.CLOCK.REALTIME, &ts) catch unreachable;
ts.tv_sec +|= @as(@TypeOf(ts.tv_sec), @intCast(timeout_ns / std.time.ns_per_s));
ts.tv_nsec += @as(@TypeOf(ts.tv_nsec), @intCast(timeout_ns % std.time.ns_per_s));
@@ -549,8 +556,8 @@ const PosixImpl = struct {
while (true) {
// Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout.
const rc = blk: {
- if (timeout == null) break :blk std.c.pthread_cond_wait(&self.cond, &self.mutex);
- break :blk std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
+ if (timeout == null) break :blk c.pthread_cond_wait(&self.cond, &self.mutex);
+ break :blk c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
};
// After waking up, check if the event was set.
@@ -574,8 +581,8 @@ const PosixImpl = struct {
}
fn set(self: *Event) void {
- assert(std.c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
+ assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
+ defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
// Make sure that multiple calls to set() were not done on the same Event.
const old_state = self.state;
@@ -586,7 +593,7 @@ const PosixImpl = struct {
// the condition variable once it observes the new state, potentially causing a UAF if done unlocked.
self.state = .notified;
if (old_state == .waiting) {
- assert(std.c.pthread_cond_signal(&self.cond) == .SUCCESS);
+ assert(c.pthread_cond_signal(&self.cond) == .SUCCESS);
}
}
};
@@ -732,7 +739,7 @@ const PosixImpl = struct {
};
const Bucket = struct {
- mutex: std.c.pthread_mutex_t align(atomic.cache_line) = .{},
+ mutex: c.pthread_mutex_t align(atomic.cache_line) = .{},
pending: atomic.Value(usize) = atomic.Value(usize).init(0),
treap: Treap = .{},
@@ -798,8 +805,8 @@ const PosixImpl = struct {
var waiter: Waiter = undefined;
{
- assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+ assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
+ defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
cancelled = ptr.load(.monotonic) != expect;
if (cancelled) {
@@ -821,8 +828,8 @@ const PosixImpl = struct {
// If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF.
defer if (!cancelled) waiter.event.wait(null) catch unreachable;
- assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+ assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
+ defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
cancelled = WaitQueue.tryRemove(&bucket.treap, address, &waiter);
if (cancelled) {
@@ -871,8 +878,8 @@ const PosixImpl = struct {
}
};
- assert(std.c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
- defer assert(std.c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
+ assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
+ defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
// Another pending check again to avoid the WaitQueue lookup if not necessary.
if (bucket.pending.load(.monotonic) > 0) {
diff --git a/lib/std/Thread/Mutex.zig b/lib/std/Thread/Mutex.zig
index 67472ffd9c..b6d3d6fb84 100644
--- a/lib/std/Thread/Mutex.zig
+++ b/lib/std/Thread/Mutex.zig
@@ -23,7 +23,6 @@ const std = @import("../std.zig");
const builtin = @import("builtin");
const Mutex = @This();
-const os = std.os;
const assert = std.debug.assert;
const testing = std.testing;
const Thread = std.Thread;
@@ -117,36 +116,40 @@ const SingleThreadedImpl = struct {
// SRWLOCK on windows is almost always faster than Futex solution.
// It also implements an efficient Condition with requeue support for us.
const WindowsImpl = struct {
- srwlock: os.windows.SRWLOCK = .{},
+ srwlock: windows.SRWLOCK = .{},
fn tryLock(self: *@This()) bool {
- return os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != os.windows.FALSE;
+ return windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock) != windows.FALSE;
}
fn lock(self: *@This()) void {
- os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock);
+ windows.kernel32.AcquireSRWLockExclusive(&self.srwlock);
}
fn unlock(self: *@This()) void {
- os.windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock);
+ windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock);
}
+
+ const windows = std.os.windows;
};
// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions.
const DarwinImpl = struct {
- oul: os.darwin.os_unfair_lock = .{},
+ oul: c.os_unfair_lock = .{},
fn tryLock(self: *@This()) bool {
- return os.darwin.os_unfair_lock_trylock(&self.oul);
+ return c.os_unfair_lock_trylock(&self.oul);
}
fn lock(self: *@This()) void {
- os.darwin.os_unfair_lock_lock(&self.oul);
+ c.os_unfair_lock_lock(&self.oul);
}
fn unlock(self: *@This()) void {
- os.darwin.os_unfair_lock_unlock(&self.oul);
+ c.os_unfair_lock_unlock(&self.oul);
}
+
+ const c = std.c;
};
const FutexImpl = struct {