diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/std/mutex.zig | 266 |
1 files changed, 182 insertions, 84 deletions
diff --git a/lib/std/mutex.zig b/lib/std/mutex.zig index 71188054f3..4ade4c3ef4 100644 --- a/lib/std/mutex.zig +++ b/lib/std/mutex.zig @@ -1,12 +1,13 @@ const std = @import("std.zig"); const builtin = @import("builtin"); +const os = std.os; const testing = std.testing; +const SpinLock = std.SpinLock; const ResetEvent = std.ResetEvent; /// Lock may be held only once. If the same thread /// tries to acquire the same mutex twice, it deadlocks. -/// This type supports static initialization and is based off of Webkit's WTF Lock (via rust parking_lot) -/// https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs +/// This type supports static initialization and is at most `@sizeOf(usize)` in size. /// When an application is built in single threaded release mode, all the functions are /// no-ops. In single threaded debug mode, there is deadlock detection. pub const Mutex = if (builtin.single_threaded) @@ -24,35 +25,114 @@ pub const Mutex = if (builtin.single_threaded) } } }; + pub fn init() Mutex { return Mutex{ .lock = lock_init }; } - pub fn deinit(self: *Mutex) void {} - pub fn acquire(self: *Mutex) Held { - if (std.debug.runtime_safety and self.lock) { - @panic("deadlock detected"); + pub fn deinit(self: *Mutex) void { + self.* = undefined; + } + + pub fn tryAcquire(self: *Mutex) ?Held { + if (std.debug.runtime_safety) { + if (self.lock) return null; + self.lock = true; } return Held{ .mutex = self }; } + + pub fn acquire(self: *Mutex) Held { + return self.tryAcquire() orelse @panic("deadlock detected"); + } } -else +else if (builtin.os == .windows) + // https://locklessinc.com/articles/keyed_events/ + extern union { + locked: u8, + waiters: u32, + + const WAKE = 1 << 8; + const WAIT = 1 << 9; + + pub fn init() Mutex { + return Mutex{ .waiters = 0 }; + } + + pub fn deinit(self: *Mutex) void { + self.* = undefined; + } + + pub fn tryAcquire(self: *Mutex) ?Held { + if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) != 0) + return null; + return Held{ .mutex = self }; + } + + pub fn acquire(self: *Mutex) Held { + return self.tryAcquire() orelse self.acquireSlow(); + } + + fn acquireSlow(self: *Mutex) Held { + @setCold(true); + while (true) : (SpinLock.yield(1)) { + const waiters = @atomicLoad(u32, &self.waiters, .Monotonic); + + // try and take lock if unlocked + if ((waiters & 1) == 0) { + if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) == 0) + return Held{ .mutex = self }; + + // otherwise, try and update the waiting count. + // then unset the WAKE bit so that another unlocker can wake up a thread. + } else if (@cmpxchgWeak(u32, &self.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) { + ResetEvent.OsEvent.Futex.wait(@ptrCast(*i32, &self.waiters), undefined, null) catch unreachable; + _ = @atomicRmw(u32, &self.waiters, .Sub, WAKE, .Monotonic); + } + } + } + + pub const Held = struct { + mutex: *Mutex, + + pub fn release(self: Held) void { + // unlock without a rmw/cmpxchg instruction + @atomicStore(u8, @ptrCast(*u8, &self.mutex.locked), 0, .Release); + + while (true) : (SpinLock.yield(1)) { + const waiters = @atomicLoad(u32, &self.mutex.waiters, .Monotonic); + + // no one is waiting + if (waiters < WAIT) return; + // someone grabbed the lock and will do the wake instead + if (waiters & 1 != 0) return; + // someone else is currently waking up + if (waiters & WAKE != 0) return; + + // try to decrease the waiter count & set the WAKE bit meaning a thread is waking up + if (@cmpxchgWeak(u32, &self.mutex.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null) + return ResetEvent.OsEvent.Futex.wake(@ptrCast(*i32, &self.mutex.waiters)); + } + } + }; + } +else if (builtin.link_libc or builtin.os == .linux) + // stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs struct { state: usize, + /// number of times to spin trying to acquire the lock. + /// https://webkit.org/blog/6161/locking-in-webkit/ + const SPIN_COUNT = 40; + const MUTEX_LOCK: usize = 1 << 0; const QUEUE_LOCK: usize = 1 << 1; const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK); - const QueueNode = std.atomic.Stack(ResetEvent).Node; - /// number of iterations to spin yielding the cpu - const SPIN_CPU = 4; - - /// number of iterations to spin in the cpu yield loop - const SPIN_CPU_COUNT = 30; - - /// number of iterations to spin yielding the thread - const SPIN_THREAD = 1; + const Node = struct { + next: ?*Node, + event: ResetEvent, + }; pub fn init() Mutex { return Mutex{ .state = 0 }; @@ -62,98 +142,116 @@ else self.* = undefined; } - pub const Held = struct { - mutex: *Mutex, + fn yield() void { + os.sched_yield() catch SpinLock.yield(30); + } - pub fn release(self: Held) void { - // since MUTEX_LOCK is the first bit, we can use (.Sub) instead of (.And, ~MUTEX_LOCK). - // this is because .Sub may be implemented more efficiently than the latter - // (e.g. `lock xadd` vs `cmpxchg` loop on x86) - const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release); - if ((state & QUEUE_MASK) != 0 and (state & QUEUE_LOCK) == 0) { - self.mutex.releaseSlow(state); - } - } - }; + pub fn tryAcquire(self: *Mutex) ?Held { + if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic) != null) + return null; + return Held{ .mutex = self }; + } pub fn acquire(self: *Mutex) Held { - // fast path close to SpinLock fast path - if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |current_state| { - self.acquireSlow(current_state); - } - return Held{ .mutex = self }; + return self.tryAcquire() orelse { + self.acquireSlow(); + return Held{ .mutex = self }; + }; } - fn acquireSlow(self: *Mutex, current_state: usize) void { - var spin: usize = 0; - var state = current_state; + fn acquireSlow(self: *Mutex) void { + // inlining the fast path and hiding *Slow() + // calls behind a @setCold(true) appears to + // improve performance in release builds. + @setCold(true); while (true) { - // try and acquire the lock if unlocked - if ((state & MUTEX_LOCK) == 0) { - state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; - continue; + // try and spin for a bit to acquire the mutex if theres currently no queue + var spin_count: u32 = SPIN_COUNT; + var state = @atomicLoad(usize, &self.state, .Monotonic); + while (spin_count != 0) : (spin_count -= 1) { + if (state & MUTEX_LOCK == 0) { + _ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; + } else if (state & QUEUE_MASK == 0) { + break; + } + yield(); + state = @atomicLoad(usize, &self.state, .Monotonic); } - // spin only if the waiting queue isn't empty and when it hasn't spun too much already - if ((state & QUEUE_MASK) == 0 and spin < SPIN_CPU + SPIN_THREAD) { - if (spin < SPIN_CPU) { - std.SpinLock.yield(SPIN_CPU_COUNT); + // create the ResetEvent node on the stack + // (faster than threadlocal on platforms like OSX) + var node: Node = undefined; + node.event = ResetEvent.init(); + defer node.event.deinit(); + + // we've spun too long, try and add our node to the LIFO queue. + // if the mutex becomes available in the process, try and grab it instead. + while (true) { + if (state & MUTEX_LOCK == 0) { + _ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return; } else { - std.os.sched_yield() catch std.time.sleep(0); + node.next = @intToPtr(?*Node, state & QUEUE_MASK); + const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK); + _ = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { + node.event.wait(); + break; + }; } + yield(); state = @atomicLoad(usize, &self.state, .Monotonic); - continue; } - - // thread should block, try and add this event to the waiting queue - var node = QueueNode{ - .next = @intToPtr(?*QueueNode, state & QUEUE_MASK), - .data = ResetEvent.init(), - }; - defer node.data.deinit(); - const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK); - state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { - // node is in the queue, wait until a `held.release()` wakes us up. - _ = node.data.wait(null) catch unreachable; - spin = 0; - state = @atomicLoad(usize, &self.state, .Monotonic); - continue; - }; } } - fn releaseSlow(self: *Mutex, current_state: usize) void { - // grab the QUEUE_LOCK in order to signal a waiting queue node's event. - var state = current_state; - while (true) { - if ((state & QUEUE_LOCK) != 0 or (state & QUEUE_MASK) == 0) + pub const Held = struct { + mutex: *Mutex, + + pub fn release(self: Held) void { + // first, remove the lock bit so another possibly parallel acquire() can succeed. + // use .Sub since it can be usually compiled down more efficiency + // (`lock sub` on x86) vs .And ~MUTEX_LOCK (`lock cmpxchg` loop on x86) + const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release); + + // if the LIFO queue isnt locked and it has a node, try and wake up the node. + if ((state & QUEUE_LOCK) == 0 and (state & QUEUE_MASK) != 0) + self.mutex.releaseSlow(); + } + }; + + fn releaseSlow(self: *Mutex) void { + @setCold(true); + + // try and lock the LFIO queue to pop a node off, + // stopping altogether if its already locked or the queue is empty + var state = @atomicLoad(usize, &self.state, .Monotonic); + while (true) : (std.SpinLock.yield(1)) { + if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0) return; state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break; } - while (true) { - // barrier needed to observe incoming state changes - defer @fence(.Acquire); - - // the mutex is currently locked. try to unset the QUEUE_LOCK and let the locker wake up the next node. - // avoids waking up multiple sleeping threads which try to acquire the lock again which increases contention. + // acquired the QUEUE_LOCK, try and pop a node to wake it. + // if the mutex is locked, then unset QUEUE_LOCK and let + // the thread who holds the mutex do the wake-up on unlock() + while (true) : (std.SpinLock.yield(1)) { if ((state & MUTEX_LOCK) != 0) { - state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return; - continue; + state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Acquire) orelse return; + } else { + const node = @intToPtr(*Node, state & QUEUE_MASK); + const new_state = @ptrToInt(node.next); + state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Acquire) orelse { + node.event.set(); + return; + }; } - - // try to pop the top node on the waiting queue stack to wake it up - // while at the same time unsetting the QUEUE_LOCK. - const node = @intToPtr(*QueueNode, state & QUEUE_MASK); - const new_state = @ptrToInt(node.next) | (state & MUTEX_LOCK); - state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse { - _ = node.data.set(false); - return; - }; } } - }; + } + +// for platforms without a known OS blocking +// primitive, default to SpinLock for correctness +else SpinLock; const TestContext = struct { mutex: *Mutex, |
