From e3ae2cfb5243e7255bf4dbcc8a9b7e77a31e9d45 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Wed, 1 Aug 2018 16:26:37 -0400 Subject: add std.event.RwLock and a few more std changes * add std.event.RwLock and std.event.RwLocked * std.debug.warn does its printing locked * add std.Mutex, however it's currently implemented as a spinlock * rename std.event.Group.cancelAll to std.event.Group.deinit and change the docs and assumptions. * add std.HashMap.clone --- CMakeLists.txt | 3 + std/debug/index.zig | 3 + std/event.zig | 4 + std/event/channel.zig | 4 + std/event/fs.zig | 4 +- std/event/group.zig | 28 +++-- std/event/lock.zig | 1 + std/event/rwlock.zig | 292 +++++++++++++++++++++++++++++++++++++++++++++++++ std/event/rwlocked.zig | 58 ++++++++++ std/event/tcp.zig | 4 +- std/hash_map.zig | 10 ++ std/index.zig | 2 + std/mutex.zig | 27 +++++ 13 files changed, 422 insertions(+), 18 deletions(-) create mode 100644 std/event/rwlock.zig create mode 100644 std/event/rwlocked.zig create mode 100644 std/mutex.zig diff --git a/CMakeLists.txt b/CMakeLists.txt index 0cf4a4029c..d7487ce905 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -466,6 +466,8 @@ set(ZIG_STD_FILES "event/lock.zig" "event/locked.zig" "event/loop.zig" + "event/rwlock.zig" + "event/rwlocked.zig" "event/tcp.zig" "fmt/errol/enum3.zig" "fmt/errol/index.zig" @@ -554,6 +556,7 @@ set(ZIG_STD_FILES "math/tanh.zig" "math/trunc.zig" "mem.zig" + "mutex.zig" "net.zig" "os/child_process.zig" "os/darwin.zig" diff --git a/std/debug/index.zig b/std/debug/index.zig index c32c3d352c..f06da85f54 100644 --- a/std/debug/index.zig +++ b/std/debug/index.zig @@ -23,7 +23,10 @@ pub const runtime_safety = switch (builtin.mode) { var stderr_file: os.File = undefined; var stderr_file_out_stream: io.FileOutStream = undefined; var stderr_stream: ?*io.OutStream(io.FileOutStream.Error) = null; +var stderr_mutex = std.Mutex.init(); pub fn warn(comptime fmt: []const u8, args: ...) void { + const held = stderr_mutex.acquire(); + defer held.release(); const stderr = getStderrStream() catch return; stderr.print(fmt, args) catch return; } diff --git a/std/event.zig b/std/event.zig index 9b692ecd44..193ccbf3e6 100644 --- a/std/event.zig +++ b/std/event.zig @@ -3,6 +3,8 @@ pub const Future = @import("event/future.zig").Future; pub const Group = @import("event/group.zig").Group; pub const Lock = @import("event/lock.zig").Lock; pub const Locked = @import("event/locked.zig").Locked; +pub const RwLock = @import("event/rwlock.zig").Lock; +pub const RwLocked = @import("event/rwlocked.zig").RwLocked; pub const Loop = @import("event/loop.zig").Loop; pub const fs = @import("event/fs.zig"); pub const tcp = @import("event/tcp.zig"); @@ -14,6 +16,8 @@ test "import event tests" { _ = @import("event/group.zig"); _ = @import("event/lock.zig"); _ = @import("event/locked.zig"); + _ = @import("event/rwlock.zig"); + _ = @import("event/rwlocked.zig"); _ = @import("event/loop.zig"); _ = @import("event/tcp.zig"); } diff --git a/std/event/channel.zig b/std/event/channel.zig index 03a036042b..61e470fa4e 100644 --- a/std/event/channel.zig +++ b/std/event/channel.zig @@ -116,6 +116,10 @@ pub fn Channel(comptime T: type) type { return result; } + fn getOrNull(self: *SelfChannel) ?T { + TODO(); + } + fn dispatch(self: *SelfChannel) void { // set the "need dispatch" flag _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); diff --git a/std/event/fs.zig b/std/event/fs.zig index a4361cbaf2..d002651ac9 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -253,7 +253,9 @@ pub async fn openReadWrite( } /// This abstraction helps to close file handles in defer expressions -/// without suspending. Start a CloseOperation before opening a file. +/// without the possibility of failure and without the use of suspend points. +/// Start a `CloseOperation` before opening a file, so that you can defer +/// `CloseOperation.deinit`. pub const CloseOperation = struct { loop: *event.Loop, have_fd: bool, diff --git a/std/event/group.zig b/std/event/group.zig index 26c098399e..6e6389b2ff 100644 --- a/std/event/group.zig +++ b/std/event/group.zig @@ -29,6 +29,17 @@ pub fn Group(comptime ReturnType: type) type { }; } + /// Cancel all the outstanding promises. Can be called even if wait was already called. + pub fn deinit(self: *Self) void { + while (self.coro_stack.pop()) |node| { + cancel node.data; + } + while (self.alloc_stack.pop()) |node| { + cancel node.data; + self.lock.loop.allocator.destroy(node); + } + } + /// Add a promise to the group. Thread-safe. pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) { const node = try self.lock.loop.allocator.create(Stack.Node{ @@ -88,7 +99,7 @@ pub fn Group(comptime ReturnType: type) type { await node.data; } else { (await node.data) catch |err| { - self.cancelAll(); + self.deinit(); return err; }; } @@ -100,25 +111,12 @@ pub fn Group(comptime ReturnType: type) type { await handle; } else { (await handle) catch |err| { - self.cancelAll(); + self.deinit(); return err; }; } } } - - /// Cancel all the outstanding promises. May only be called if wait was never called. - /// TODO These should be `cancelasync` not `cancel`. - /// See https://github.com/ziglang/zig/issues/1261 - pub fn cancelAll(self: *Self) void { - while (self.coro_stack.pop()) |node| { - cancel node.data; - } - while (self.alloc_stack.pop()) |node| { - cancel node.data; - self.lock.loop.allocator.destroy(node); - } - } }; } diff --git a/std/event/lock.zig b/std/event/lock.zig index 2013b5595f..0632960f80 100644 --- a/std/event/lock.zig +++ b/std/event/lock.zig @@ -9,6 +9,7 @@ const Loop = std.event.Loop; /// Thread-safe async/await lock. /// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and /// are resumed when the lock is released, in order. +/// Allows only one actor to hold the lock. pub const Lock = struct { loop: *Loop, shared_bit: u8, // TODO make this a bool diff --git a/std/event/rwlock.zig b/std/event/rwlock.zig new file mode 100644 index 0000000000..07b7340fca --- /dev/null +++ b/std/event/rwlock.zig @@ -0,0 +1,292 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const mem = std.mem; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const Loop = std.event.Loop; + +/// Thread-safe async/await lock. +/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +/// Many readers can hold the lock at the same time; however locking for writing is exclusive. +pub const RwLock = struct { + loop: *Loop, + shared_state: u8, // TODO make this an enum + writer_queue: Queue, + reader_queue: Queue, + writer_queue_empty_bit: u8, // TODO make this a bool + reader_queue_empty_bit: u8, // TODO make this a bool + reader_lock_count: usize, + + const State = struct { + const Unlocked = 0; + const WriteLock = 1; + const ReadLock = 2; + }; + + const Queue = std.atomic.Queue(promise); + + pub const HeldRead = struct { + lock: *RwLock, + + pub fn release(self: HeldRead) void { + // If other readers still hold the lock, we're done. + if (@atomicRmw(usize, &self.lock.reader_lock_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst) != 1) { + return; + } + + _ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) { + // Didn't unlock. Someone else's problem. + return; + } + + self.lock.commonPostUnlock(); + } + }; + + pub const HeldWrite = struct { + lock: *RwLock, + + pub fn release(self: HeldWrite) void { + // See if we can leave it locked for writing, and pass the lock to the next writer + // in the queue to grab the lock. + if (self.lock.writer_queue.get()) |node| { + self.lock.loop.onNextTick(node); + return; + } + + // We need to release the write lock. Check if any readers are waiting to grab the lock. + if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, AtomicOrder.SeqCst) == 0) { + // Switch to a read lock. + _ = @atomicRmw(u8, &self.lock.shared_state, AtomicRmwOp.Xchg, State.ReadLock, AtomicOrder.SeqCst); + while (self.lock.reader_queue.get()) |node| { + self.lock.loop.onNextTick(node); + } + return; + } + + _ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.lock.shared_state, AtomicRmwOp.Xchg, State.Unlocked, AtomicOrder.SeqCst); + + self.lock.commonPostUnlock(); + } + }; + + pub fn init(loop: *Loop) RwLock { + return RwLock{ + .loop = loop, + .shared_state = State.Unlocked, + .writer_queue = Queue.init(), + .writer_queue_empty_bit = 1, + .reader_queue = Queue.init(), + .reader_queue_empty_bit = 1, + .reader_lock_count = 0, + }; + } + + /// Must be called when not locked. Not thread safe. + /// All calls to acquire() and release() must complete before calling deinit(). + pub fn deinit(self: *RwLock) void { + assert(self.shared_state == State.Unlocked); + while (self.writer_queue.get()) |node| cancel node.data; + while (self.reader_queue.get()) |node| cancel node.data; + } + + pub async fn acquireRead(self: *RwLock) HeldRead { + _ = @atomicRmw(usize, &self.reader_lock_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + + suspend |handle| { + // TODO explicitly put this memory in the coroutine frame #1194 + var my_tick_node = Loop.NextTickNode{ + .data = handle, + .next = undefined, + }; + + self.reader_queue.put(&my_tick_node); + + // At this point, we are in the reader_queue, so we might have already been resumed and this coroutine + // frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame. + + // We set this bit so that later we can rely on the fact, that if reader_queue_empty_bit is 1, + // some actor will attempt to grab the lock. + _ = @atomicRmw(u8, &self.reader_queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + // Here we don't care if we are the one to do the locking or if it was already locked for reading. + const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst)) |old_state| old_state == State.ReadLock else true; + if (have_read_lock) { + // Give out all the read locks. + if (self.reader_queue.get()) |first_node| { + while (self.reader_queue.get()) |node| { + self.loop.onNextTick(node); + } + resume first_node.data; + } + } + } + return HeldRead{ .lock = self }; + } + + pub async fn acquireWrite(self: *RwLock) HeldWrite { + suspend |handle| { + // TODO explicitly put this memory in the coroutine frame #1194 + var my_tick_node = Loop.NextTickNode{ + .data = handle, + .next = undefined, + }; + + self.writer_queue.put(&my_tick_node); + + // At this point, we are in the writer_queue, so we might have already been resumed and this coroutine + // frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame. + + // We set this bit so that later we can rely on the fact, that if writer_queue_empty_bit is 1, + // some actor will attempt to grab the lock. + _ = @atomicRmw(u8, &self.writer_queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); + + // Here we must be the one to acquire the write lock. It cannot already be locked. + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) == null) { + // We now have a write lock. + if (self.writer_queue.get()) |node| { + // Whether this node is us or someone else, we tail resume it. + resume node.data; + } + } + } + return HeldWrite{ .lock = self }; + } + + fn commonPostUnlock(self: *RwLock) void { + while (true) { + // There might be a writer_queue item or a reader_queue item + // If we check and both are empty, we can be done, because the other actors will try to + // obtain the lock. + // But if there's a writer_queue item or a reader_queue item, + // we are the actor which must loop and attempt to grab the lock again. + if (@atomicLoad(u8, &self.writer_queue_empty_bit, AtomicOrder.SeqCst) == 0) { + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) { + // We did not obtain the lock. Great, the queues are someone else's problem. + return; + } + // If there's an item in the writer queue, give them the lock, and we're done. + if (self.writer_queue.get()) |node| { + self.loop.onNextTick(node); + return; + } + // Release the lock again. + _ = @atomicRmw(u8, &self.writer_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + _ = @atomicRmw(u8, &self.shared_state, AtomicRmwOp.Xchg, State.Unlocked, AtomicOrder.SeqCst); + continue; + } + + if (@atomicLoad(u8, &self.reader_queue_empty_bit, AtomicOrder.SeqCst) == 0) { + if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) { + // We did not obtain the lock. Great, the queues are someone else's problem. + return; + } + // If there are any items in the reader queue, give out all the reader locks, and we're done. + if (self.reader_queue.get()) |first_node| { + self.loop.onNextTick(first_node); + while (self.reader_queue.get()) |node| { + self.loop.onNextTick(node); + } + return; + } + // Release the lock again. + _ = @atomicRmw(u8, &self.reader_queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, AtomicOrder.SeqCst, AtomicOrder.SeqCst) != null) { + // Didn't unlock. Someone else's problem. + return; + } + continue; + } + return; + } + } +}; + +test "std.event.RwLock" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + var lock = RwLock.init(&loop); + defer lock.deinit(); + + const handle = try async testLock(&loop, &lock); + defer cancel handle; + loop.run(); + + const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len; + assert(mem.eql(i32, shared_test_data, expected_result)); +} + +async fn testLock(loop: *Loop, lock: *RwLock) void { + // TODO explicitly put next tick node memory in the coroutine frame #1194 + suspend |p| { + resume p; + } + + var read_nodes: [100]Loop.NextTickNode = undefined; + for (read_nodes) |*read_node| { + read_node.data = async readRunner(lock) catch @panic("out of memory"); + loop.onNextTick(read_node); + } + + var write_nodes: [shared_it_count]Loop.NextTickNode = undefined; + for (write_nodes) |*write_node| { + write_node.data = async writeRunner(lock) catch @panic("out of memory"); + loop.onNextTick(write_node); + } + + for (write_nodes) |*write_node| { + await @ptrCast(promise->void, write_node.data); + } + for (read_nodes) |*read_node| { + await @ptrCast(promise->void, read_node.data); + } +} + +const shared_it_count = 10; +var shared_test_data = [1]i32{0} ** 10; +var shared_test_index: usize = 0; +var shared_count: usize = 0; + +async fn writeRunner(lock: *RwLock) void { + suspend; // resumed by onNextTick + + var i: usize = 0; + while (i < shared_test_data.len) : (i += 1) { + std.os.time.sleep(0, 100000); + const lock_promise = async lock.acquireWrite() catch @panic("out of memory"); + const handle = await lock_promise; + defer handle.release(); + + shared_count += 1; + while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) { + shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1; + } + shared_test_index = 0; + } +} + +async fn readRunner(lock: *RwLock) void { + suspend; // resumed by onNextTick + std.os.time.sleep(0, 1); + + var i: usize = 0; + while (i < shared_test_data.len) : (i += 1) { + const lock_promise = async lock.acquireRead() catch @panic("out of memory"); + const handle = await lock_promise; + defer handle.release(); + + assert(shared_test_index == 0); + assert(shared_test_data[i] == @intCast(i32, shared_count)); + } +} diff --git a/std/event/rwlocked.zig b/std/event/rwlocked.zig new file mode 100644 index 0000000000..ef7e83d20c --- /dev/null +++ b/std/event/rwlocked.zig @@ -0,0 +1,58 @@ +const std = @import("../index.zig"); +const RwLock = std.event.RwLock; +const Loop = std.event.Loop; + +/// Thread-safe async/await RW lock that protects one piece of data. +/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and +/// are resumed when the lock is released, in order. +pub fn RwLocked(comptime T: type) type { + return struct { + lock: RwLock, + locked_data: T, + + const Self = this; + + pub const HeldReadLock = struct { + value: *const T, + held: RwLock.HeldRead, + + pub fn release(self: HeldReadLock) void { + self.held.release(); + } + }; + + pub const HeldWriteLock = struct { + value: *T, + held: RwLock.HeldWrite, + + pub fn release(self: HeldWriteLock) void { + self.held.release(); + } + }; + + pub fn init(loop: *Loop, data: T) Self { + return Self{ + .lock = RwLock.init(loop), + .locked_data = data, + }; + } + + pub fn deinit(self: *Self) void { + self.lock.deinit(); + } + + pub async fn acquireRead(self: *Self) HeldReadLock { + return HeldReadLock{ + .held = await (async self.lock.acquireRead() catch unreachable), + .value = &self.locked_data, + }; + } + + pub async fn acquireWrite(self: *Self) HeldWriteLock { + return HeldWriteLock{ + .held = await (async self.lock.acquireWrite() catch unreachable), + .value = &self.locked_data, + }; + } + }; +} diff --git a/std/event/tcp.zig b/std/event/tcp.zig index 42018b03e5..6757d8cc09 100644 --- a/std/event/tcp.zig +++ b/std/event/tcp.zig @@ -61,7 +61,7 @@ pub const Server = struct { /// Stop listening pub fn close(self: *Server) void { - self.loop.removeFd(self.sockfd.?); + self.loop.linuxRemoveFd(self.sockfd.?); std.os.close(self.sockfd.?); } @@ -116,7 +116,7 @@ pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File errdefer std.os.close(sockfd); try std.os.posixConnectAsync(sockfd, &address.os_addr); - try await try async loop.linuxWaitFd(sockfd, posix.EPOLLIN | posix.EPOLLOUT); + try await try async loop.linuxWaitFd(sockfd, posix.EPOLLIN | posix.EPOLLOUT | posix.EPOLLET); try std.os.posixGetSockOptConnectError(sockfd); return std.os.File.openHandle(sockfd); diff --git a/std/hash_map.zig b/std/hash_map.zig index cebd5272c0..16c305223f 100644 --- a/std/hash_map.zig +++ b/std/hash_map.zig @@ -163,6 +163,16 @@ pub fn HashMap(comptime K: type, comptime V: type, comptime hash: fn (key: K) u3 }; } + pub fn clone(self: Self) !Self { + var other = Self.init(self.allocator); + try other.initCapacity(self.entries.len); + var it = self.iterator(); + while (it.next()) |entry| { + try other.put(entry.key, entry.value); + } + return other; + } + fn initCapacity(hm: *Self, capacity: usize) !void { hm.entries = try hm.allocator.alloc(Entry, capacity); hm.size = 0; diff --git a/std/index.zig b/std/index.zig index 2f4cfb7553..23599c8c96 100644 --- a/std/index.zig +++ b/std/index.zig @@ -9,6 +9,7 @@ pub const LinkedList = @import("linked_list.zig").LinkedList; pub const IntrusiveLinkedList = @import("linked_list.zig").IntrusiveLinkedList; pub const SegmentedList = @import("segmented_list.zig").SegmentedList; pub const DynLib = @import("dynamic_library.zig").DynLib; +pub const Mutex = @import("mutex.zig").Mutex; pub const atomic = @import("atomic/index.zig"); pub const base64 = @import("base64.zig"); @@ -48,6 +49,7 @@ test "std" { _ = @import("hash_map.zig"); _ = @import("linked_list.zig"); _ = @import("segmented_list.zig"); + _ = @import("mutex.zig"); _ = @import("base64.zig"); _ = @import("build.zig"); diff --git a/std/mutex.zig b/std/mutex.zig new file mode 100644 index 0000000000..6aee87d1d7 --- /dev/null +++ b/std/mutex.zig @@ -0,0 +1,27 @@ +const std = @import("index.zig"); +const builtin = @import("builtin"); +const AtomicOrder = builtin.AtomicOrder; +const AtomicRmwOp = builtin.AtomicRmwOp; +const assert = std.debug.assert; + +/// TODO use syscalls instead of a spinlock +pub const Mutex = struct { + lock: u8, // TODO use a bool + + pub const Held = struct { + mutex: *Mutex, + + pub fn release(self: Held) void { + assert(@atomicRmw(u8, &self.mutex.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1); + } + }; + + pub fn init() Mutex { + return Mutex{ .lock = 0 }; + } + + pub fn acquire(self: *Mutex) Held { + while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {} + return Held{ .mutex = self }; + } +}; -- cgit v1.2.3