aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event/lock.zig
diff options
context:
space:
mode:
Diffstat (limited to 'lib/std/event/lock.zig')
-rw-r--r--lib/std/event/lock.zig186
1 files changed, 186 insertions, 0 deletions
diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig
new file mode 100644
index 0000000000..a0b1fd3e50
--- /dev/null
+++ b/lib/std/event/lock.zig
@@ -0,0 +1,186 @@
+const std = @import("../std.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const testing = std.testing;
+const mem = std.mem;
+const Loop = std.event.Loop;
+
+/// Thread-safe async/await lock.
+/// Functions which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+/// Allows only one actor to hold the lock.
+pub const Lock = struct {
+ loop: *Loop,
+ shared_bit: u8, // TODO make this a bool
+ queue: Queue,
+ queue_empty_bit: u8, // TODO make this a bool
+
+ const Queue = std.atomic.Queue(anyframe);
+
+ pub const Held = struct {
+ lock: *Lock,
+
+ pub fn release(self: Held) void {
+ // Resume the next item from the queue.
+ if (self.lock.queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // We need to release the lock.
+ _ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst);
+
+ // There might be a queue item. If we know the queue is empty, we can be done,
+ // because the other actor will try to obtain the lock.
+ // But if there's a queue item, we are the actor which must loop and attempt
+ // to grab the lock again.
+ if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) {
+ return;
+ }
+
+ while (true) {
+ const old_bit = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 1, .SeqCst);
+ if (old_bit != 0) {
+ // We did not obtain the lock. Great, the queue is someone else's problem.
+ return;
+ }
+
+ // Resume the next item from the queue.
+ if (self.lock.queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // Release the lock again.
+ _ = @atomicRmw(u8, &self.lock.queue_empty_bit, .Xchg, 1, .SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 0, .SeqCst);
+
+ // Find out if we can be done.
+ if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) {
+ return;
+ }
+ }
+ }
+ };
+
+ pub fn init(loop: *Loop) Lock {
+ return Lock{
+ .loop = loop,
+ .shared_bit = 0,
+ .queue = Queue.init(),
+ .queue_empty_bit = 1,
+ };
+ }
+
+ pub fn initLocked(loop: *Loop) Lock {
+ return Lock{
+ .loop = loop,
+ .shared_bit = 1,
+ .queue = Queue.init(),
+ .queue_empty_bit = 1,
+ };
+ }
+
+ /// Must be called when not locked. Not thread safe.
+ /// All calls to acquire() and release() must complete before calling deinit().
+ pub fn deinit(self: *Lock) void {
+ assert(self.shared_bit == 0);
+ while (self.queue.get()) |node| resume node.data;
+ }
+
+ pub async fn acquire(self: *Lock) Held {
+ var my_tick_node = Loop.NextTickNode.init(@frame());
+
+ errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire
+ suspend {
+ self.queue.put(&my_tick_node);
+
+ // At this point, we are in the queue, so we might have already been resumed.
+
+ // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
+ // will attempt to grab the lock.
+ _ = @atomicRmw(u8, &self.queue_empty_bit, .Xchg, 0, .SeqCst);
+
+ const old_bit = @atomicRmw(u8, &self.shared_bit, .Xchg, 1, .SeqCst);
+ if (old_bit == 0) {
+ if (self.queue.get()) |node| {
+ // Whether this node is us or someone else, we tail resume it.
+ resume node.data;
+ }
+ }
+ }
+
+ return Held{ .lock = self };
+ }
+};
+
+test "std.event.Lock" {
+ // TODO https://github.com/ziglang/zig/issues/1908
+ if (builtin.single_threaded) return error.SkipZigTest;
+ // TODO https://github.com/ziglang/zig/issues/3251
+ if (std.os.freebsd.is_the_target) return error.SkipZigTest;
+
+ const allocator = std.heap.direct_allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var lock = Lock.init(&loop);
+ defer lock.deinit();
+
+ _ = async testLock(&loop, &lock);
+ loop.run();
+
+ testing.expectEqualSlices(i32, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len, shared_test_data);
+}
+
+async fn testLock(loop: *Loop, lock: *Lock) void {
+ var handle1 = async lockRunner(lock);
+ var tick_node1 = Loop.NextTickNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = &handle1,
+ };
+ loop.onNextTick(&tick_node1);
+
+ var handle2 = async lockRunner(lock);
+ var tick_node2 = Loop.NextTickNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = &handle2,
+ };
+ loop.onNextTick(&tick_node2);
+
+ var handle3 = async lockRunner(lock);
+ var tick_node3 = Loop.NextTickNode{
+ .prev = undefined,
+ .next = undefined,
+ .data = &handle3,
+ };
+ loop.onNextTick(&tick_node3);
+
+ await handle1;
+ await handle2;
+ await handle3;
+}
+
+var shared_test_data = [1]i32{0} ** 10;
+var shared_test_index: usize = 0;
+
+async fn lockRunner(lock: *Lock) void {
+ suspend; // resumed by onNextTick
+
+ var i: usize = 0;
+ while (i < shared_test_data.len) : (i += 1) {
+ var lock_frame = async lock.acquire();
+ const handle = await lock_frame;
+ defer handle.release();
+
+ shared_test_index = 0;
+ while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
+ shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
+ }
+ }
+}