diff options
| author | Andrew Kelley <superjoe30@gmail.com> | 2018-10-03 14:55:12 -0400 |
|---|---|---|
| committer | Andrew Kelley <superjoe30@gmail.com> | 2018-10-03 14:55:12 -0400 |
| commit | 3f13a59cbc4235a5abcc2ca35bbc3f172336fd61 (patch) | |
| tree | abfac8c81ff70fee75d81c07aae4ac4080218e00 /std/mutex.zig | |
| parent | 66cb75d1148fffdd161e7829b9e27aa52f0f1616 (diff) | |
| download | zig-3f13a59cbc4235a5abcc2ca35bbc3f172336fd61.tar.gz zig-3f13a59cbc4235a5abcc2ca35bbc3f172336fd61.zip | |
better mutex implementation
based on Ulrich Drepper's "Futexes are tricky" paper, Mutex, Take 3
also includes tests
Diffstat (limited to 'std/mutex.zig')
| -rw-r--r-- | std/mutex.zig | 81 |
1 files changed, 61 insertions, 20 deletions
diff --git a/std/mutex.zig b/std/mutex.zig index 9dc0c23d6d..e35bd81bc4 100644 --- a/std/mutex.zig +++ b/std/mutex.zig @@ -8,6 +8,8 @@ const linux = std.os.linux; /// Lock may be held only once. If the same thread /// tries to acquire the same mutex twice, it deadlocks. +/// The Linux implementation is based on mutex3 from +/// https://www.akkadia.org/drepper/futex.pdf pub const Mutex = struct { /// 0: unlocked /// 1: locked, no waiters @@ -25,12 +27,10 @@ pub const Mutex = struct { pub fn release(self: Held) void { if (builtin.os == builtin.Os.linux) { - // Always unlock. If the previous state was Locked-No-Waiters, then we're done. - // Otherwise, wake a waiter up. - const prev = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release); - if (prev != 1) { - assert(prev == 2); - const rc = linux.futex_wake(&self.mutex.linux_lock, linux.FUTEX_WAKE, 1); + const c = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Sub, 1, AtomicOrder.Release); + if (c != 1) { + _ = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release); + const rc = linux.futex_wake(&self.mutex.linux_lock, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1); switch (linux.getErrno(rc)) { 0 => {}, linux.EINVAL => unreachable, @@ -52,21 +52,18 @@ pub const Mutex = struct { pub fn acquire(self: *Mutex) Held { if (builtin.os == builtin.Os.linux) { - // First try to go from Unlocked to Locked-No-Waiters. If this succeeds, no syscalls are needed. - // Otherwise, we need to be in the Locked-With-Waiters state. If we are already in that state, - // proceed to futex_wait. Otherwise, try to go from Locked-No-Waiters to Locked-With-Waiters. - // If that succeeds, proceed to futex_wait. Otherwise start the whole loop over again. - while (@cmpxchgWeak(i32, &self.linux_lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic)) |l| { - if (l == 2 or - @cmpxchgWeak(i32, &self.linux_lock, 1, 2, AtomicOrder.Acquire, AtomicOrder.Monotonic) == null) - { - const rc = linux.futex_wait(&self.linux_lock, linux.FUTEX_WAIT, 2, null); - switch (linux.getErrno(rc)) { - 0, linux.EINTR, linux.EAGAIN => continue, - linux.EINVAL => unreachable, - else => unreachable, - } + var c = @cmpxchgWeak(i32, &self.linux_lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic) orelse + return Held{ .mutex = self }; + if (c != 2) + c = @atomicRmw(i32, &self.linux_lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire); + while (c != 0) { + const rc = linux.futex_wait(&self.linux_lock, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, 2, null); + switch (linux.getErrno(rc)) { + 0, linux.EINTR, linux.EAGAIN => {}, + linux.EINVAL => unreachable, + else => unreachable, } + c = @atomicRmw(i32, &self.linux_lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire); } } else { _ = self.spin_lock.acquire(); @@ -74,3 +71,47 @@ pub const Mutex = struct { return Held{ .mutex = self }; } }; + +const Context = struct { + mutex: *Mutex, + data: i128, + + const incr_count = 10000; +}; + +test "std.Mutex" { + var direct_allocator = std.heap.DirectAllocator.init(); + defer direct_allocator.deinit(); + + var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 300 * 1024); + defer direct_allocator.allocator.free(plenty_of_memory); + + var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory); + var a = &fixed_buffer_allocator.allocator; + + var mutex = Mutex.init(); + var context = Context{ + .mutex = &mutex, + .data = 0, + }; + + const thread_count = 10; + var threads: [thread_count]*std.os.Thread = undefined; + for (threads) |*t| { + t.* = try std.os.spawnThread(&context, worker); + } + for (threads) |t| + t.wait(); + + std.debug.assertOrPanic(context.data == thread_count * Context.incr_count); +} + +fn worker(ctx: *Context) void { + var i: usize = 0; + while (i != Context.incr_count) : (i += 1) { + const held = ctx.mutex.acquire(); + defer held.release(); + + ctx.data += 1; + } +} |
