diff options
| author | Andrew Kelley <andrew@ziglang.org> | 2020-03-13 15:17:53 -0400 |
|---|---|---|
| committer | Andrew Kelley <andrew@ziglang.org> | 2020-03-13 15:17:53 -0400 |
| commit | 656ba530d80e67bc7bb9c40e5c2db26a40743a15 (patch) | |
| tree | 767f4d57000922cf122ae965dc825f87c62ec64e /lib/std/event | |
| parent | 96c07674fc2293fa040212ab797c05436dc515b1 (diff) | |
| parent | 3eff77bfb52accbc16eb831753ff4917fc2b4873 (diff) | |
| download | zig-656ba530d80e67bc7bb9c40e5c2db26a40743a15.tar.gz zig-656ba530d80e67bc7bb9c40e5c2db26a40743a15.zip | |
Merge remote-tracking branch 'origin/master' into llvm10
Diffstat (limited to 'lib/std/event')
| -rw-r--r-- | lib/std/event/channel.zig | 23 | ||||
| -rw-r--r-- | lib/std/event/group.zig | 4 | ||||
| -rw-r--r-- | lib/std/event/lock.zig | 39 | ||||
| -rw-r--r-- | lib/std/event/loop.zig | 78 | ||||
| -rw-r--r-- | lib/std/event/rwlock.zig | 32 |
5 files changed, 127 insertions, 49 deletions
diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig index 3c5b48d047..83c77bcac5 100644 --- a/lib/std/event/channel.zig +++ b/lib/std/event/channel.zig @@ -14,8 +14,8 @@ pub fn Channel(comptime T: type) type { putters: std.atomic.Queue(PutNode), get_count: usize, put_count: usize, - dispatch_lock: u8, // TODO make this a bool - need_dispatch: u8, // TODO make this a bool + dispatch_lock: bool, + need_dispatch: bool, // simple fixed size ring buffer buffer_nodes: []T, @@ -62,8 +62,8 @@ pub fn Channel(comptime T: type) type { .buffer_len = 0, .buffer_nodes = buffer, .buffer_index = 0, - .dispatch_lock = 0, - .need_dispatch = 0, + .dispatch_lock = false, + .need_dispatch = false, .getters = std.atomic.Queue(GetNode).init(), .putters = std.atomic.Queue(PutNode).init(), .or_null_queue = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).init(), @@ -165,15 +165,14 @@ pub fn Channel(comptime T: type) type { fn dispatch(self: *SelfChannel) void { // set the "need dispatch" flag - @atomicStore(u8, &self.need_dispatch, 1, .SeqCst); + @atomicStore(bool, &self.need_dispatch, true, .SeqCst); lock: while (true) { // set the lock flag - const prev_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 1, .SeqCst); - if (prev_lock != 0) return; + if (@atomicRmw(bool, &self.dispatch_lock, .Xchg, true, .SeqCst)) return; // clear the need_dispatch flag since we're about to do it - @atomicStore(u8, &self.need_dispatch, 0, .SeqCst); + @atomicStore(bool, &self.need_dispatch, false, .SeqCst); while (true) { one_dispatch: { @@ -250,14 +249,12 @@ pub fn Channel(comptime T: type) type { } // clear need-dispatch flag - const need_dispatch = @atomicRmw(u8, &self.need_dispatch, .Xchg, 0, .SeqCst); - if (need_dispatch != 0) continue; + if (@atomicRmw(bool, &self.need_dispatch, .Xchg, false, .SeqCst)) continue; - const my_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 0, .SeqCst); - assert(my_lock != 0); + assert(@atomicRmw(bool, &self.dispatch_lock, .Xchg, false, .SeqCst)); // we have to check again now that we unlocked - if (@atomicLoad(u8, &self.need_dispatch, .SeqCst) != 0) continue :lock; + if (@atomicLoad(bool, &self.need_dispatch, .SeqCst)) continue :lock; return; } diff --git a/lib/std/event/group.zig b/lib/std/event/group.zig index ac1bf68245..5eebb7ffbc 100644 --- a/lib/std/event/group.zig +++ b/lib/std/event/group.zig @@ -120,9 +120,11 @@ test "std.event.Group" { // https://github.com/ziglang/zig/issues/1908 if (builtin.single_threaded) return error.SkipZigTest; - // TODO provide a way to run tests in evented I/O mode if (!std.io.is_async) return error.SkipZigTest; + // TODO this file has bit-rotted. repair it + if (true) return error.SkipZigTest; + const handle = async testGroup(std.heap.page_allocator); } diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig index b9cbb5d95f..ff1f738c5e 100644 --- a/lib/std/event/lock.zig +++ b/lib/std/event/lock.zig @@ -11,9 +11,9 @@ const Loop = std.event.Loop; /// Allows only one actor to hold the lock. /// TODO: make this API also work in blocking I/O mode. pub const Lock = struct { - shared_bit: u8, // TODO make this a bool + shared: bool, queue: Queue, - queue_empty_bit: u8, // TODO make this a bool + queue_empty: bool, const Queue = std.atomic.Queue(anyframe); @@ -31,20 +31,19 @@ pub const Lock = struct { } // We need to release the lock. - @atomicStore(u8, &self.lock.queue_empty_bit, 1, .SeqCst); - @atomicStore(u8, &self.lock.shared_bit, 0, .SeqCst); + @atomicStore(bool, &self.lock.queue_empty, true, .SeqCst); + @atomicStore(bool, &self.lock.shared, false, .SeqCst); // There might be a queue item. If we know the queue is empty, we can be done, // because the other actor will try to obtain the lock. // But if there's a queue item, we are the actor which must loop and attempt // to grab the lock again. - if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) { + if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) { return; } while (true) { - const old_bit = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 1, .SeqCst); - if (old_bit != 0) { + if (@atomicRmw(bool, &self.lock.shared, .Xchg, true, .SeqCst)) { // We did not obtain the lock. Great, the queue is someone else's problem. return; } @@ -56,11 +55,11 @@ pub const Lock = struct { } // Release the lock again. - @atomicStore(u8, &self.lock.queue_empty_bit, 1, .SeqCst); - @atomicStore(u8, &self.lock.shared_bit, 0, .SeqCst); + @atomicStore(bool, &self.lock.queue_empty, true, .SeqCst); + @atomicStore(bool, &self.lock.shared, false, .SeqCst); // Find out if we can be done. - if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) { + if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) { return; } } @@ -69,24 +68,24 @@ pub const Lock = struct { pub fn init() Lock { return Lock{ - .shared_bit = 0, + .shared = false, .queue = Queue.init(), - .queue_empty_bit = 1, + .queue_empty = true, }; } pub fn initLocked() Lock { return Lock{ - .shared_bit = 1, + .shared = true, .queue = Queue.init(), - .queue_empty_bit = 1, + .queue_empty = true, }; } /// Must be called when not locked. Not thread safe. /// All calls to acquire() and release() must complete before calling deinit(). pub fn deinit(self: *Lock) void { - assert(self.shared_bit == 0); + assert(!self.shared); while (self.queue.get()) |node| resume node.data; } @@ -99,12 +98,11 @@ pub const Lock = struct { // At this point, we are in the queue, so we might have already been resumed. - // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor + // We set this bit so that later we can rely on the fact, that if queue_empty == true, some actor // will attempt to grab the lock. - @atomicStore(u8, &self.queue_empty_bit, 0, .SeqCst); + @atomicStore(bool, &self.queue_empty, false, .SeqCst); - const old_bit = @atomicRmw(u8, &self.shared_bit, .Xchg, 1, .SeqCst); - if (old_bit == 0) { + if (!@atomicRmw(bool, &self.shared, .Xchg, true, .SeqCst)) { if (self.queue.get()) |node| { // Whether this node is us or someone else, we tail resume it. resume node.data; @@ -125,6 +123,9 @@ test "std.event.Lock" { // TODO https://github.com/ziglang/zig/issues/3251 if (builtin.os.tag == .freebsd) return error.SkipZigTest; + // TODO this file has bit-rotted. repair it + if (true) return error.SkipZigTest; + var lock = Lock.init(); defer lock.deinit(); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index e62f15d59a..7db6fe98de 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -809,6 +809,28 @@ pub const Loop = struct { return req_node.data.msg.readv.result; } + /// Performs an async `os.pread` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64) os.PReadError!usize { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .pread = .{ + .fd = fd, + .buf = buf, + .offset = offset, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.pread.result; + } + /// Performs an async `os.preadv` using a separate thread. /// `fd` must block and not return EAGAIN. pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64) os.ReadError!usize { @@ -895,6 +917,35 @@ pub const Loop = struct { return req_node.data.msg.pwritev.result; } + /// Performs an async `os.faccessatZ` using a separate thread. + /// `fd` must block and not return EAGAIN. + pub fn faccessatZ( + self: *Loop, + dirfd: os.fd_t, + path_z: [*:0]const u8, + mode: u32, + flags: u32, + ) os.AccessError!void { + var req_node = Request.Node{ + .data = .{ + .msg = .{ + .faccessat = .{ + .dirfd = dirfd, + .path = path_z, + .mode = mode, + .flags = flags, + .result = undefined, + }, + }, + .finish = .{ .TickNode = .{ .data = @frame() } }, + }, + }; + suspend { + self.posixFsRequest(&req_node); + } + return req_node.data.msg.faccessat.result; + } + fn workerRun(self: *Loop) void { while (true) { while (true) { @@ -1038,6 +1089,9 @@ pub const Loop = struct { .pwritev => |*msg| { msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset); }, + .pread => |*msg| { + msg.result = noasync os.pread(msg.fd, msg.buf, msg.offset); + }, .preadv => |*msg| { msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset); }, @@ -1047,6 +1101,9 @@ pub const Loop = struct { .openat => |*msg| { msg.result = noasync os.openatC(msg.fd, msg.path, msg.flags, msg.mode); }, + .faccessat => |*msg| { + msg.result = noasync os.faccessatZ(msg.dirfd, msg.path, msg.mode, msg.flags); + }, .close => |*msg| noasync os.close(msg.fd), } switch (node.data.finish) { @@ -1120,10 +1177,12 @@ pub const Loop = struct { write: Write, writev: WriteV, pwritev: PWriteV, + pread: PRead, preadv: PReadV, open: Open, openat: OpenAt, close: Close, + faccessat: FAccessAt, /// special - means the fs thread should exit end, @@ -1161,6 +1220,15 @@ pub const Loop = struct { pub const Error = os.PWriteError; }; + pub const PRead = struct { + fd: os.fd_t, + buf: []u8, + offset: usize, + result: Error!usize, + + pub const Error = os.PReadError; + }; + pub const PReadV = struct { fd: os.fd_t, iov: []const os.iovec, @@ -1192,6 +1260,16 @@ pub const Loop = struct { pub const Close = struct { fd: os.fd_t, }; + + pub const FAccessAt = struct { + dirfd: os.fd_t, + path: [*:0]const u8, + mode: u32, + flags: u32, + result: Error!void, + + pub const Error = os.AccessError; + }; }; }; }; diff --git a/lib/std/event/rwlock.zig b/lib/std/event/rwlock.zig index f4b13d008b..425088063f 100644 --- a/lib/std/event/rwlock.zig +++ b/lib/std/event/rwlock.zig @@ -16,8 +16,8 @@ pub const RwLock = struct { shared_state: State, writer_queue: Queue, reader_queue: Queue, - writer_queue_empty_bit: u8, // TODO make this a bool - reader_queue_empty_bit: u8, // TODO make this a bool + writer_queue_empty: bool, + reader_queue_empty: bool, reader_lock_count: usize, const State = enum(u8) { @@ -40,7 +40,7 @@ pub const RwLock = struct { return; } - @atomicStore(u8, &self.lock.reader_queue_empty_bit, 1, .SeqCst); + @atomicStore(bool, &self.lock.reader_queue_empty, true, .SeqCst); if (@cmpxchgStrong(State, &self.lock.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) { // Didn't unlock. Someone else's problem. return; @@ -62,7 +62,7 @@ pub const RwLock = struct { } // We need to release the write lock. Check if any readers are waiting to grab the lock. - if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, .SeqCst) == 0) { + if (!@atomicLoad(bool, &self.lock.reader_queue_empty, .SeqCst)) { // Switch to a read lock. @atomicStore(State, &self.lock.shared_state, .ReadLock, .SeqCst); while (self.lock.reader_queue.get()) |node| { @@ -71,7 +71,7 @@ pub const RwLock = struct { return; } - @atomicStore(u8, &self.lock.writer_queue_empty_bit, 1, .SeqCst); + @atomicStore(bool, &self.lock.writer_queue_empty, true, .SeqCst); @atomicStore(State, &self.lock.shared_state, .Unlocked, .SeqCst); self.lock.commonPostUnlock(); @@ -79,12 +79,12 @@ pub const RwLock = struct { }; pub fn init() RwLock { - return RwLock{ + return .{ .shared_state = .Unlocked, .writer_queue = Queue.init(), - .writer_queue_empty_bit = 1, + .writer_queue_empty = true, .reader_queue = Queue.init(), - .reader_queue_empty_bit = 1, + .reader_queue_empty = true, .reader_lock_count = 0, }; } @@ -111,9 +111,9 @@ pub const RwLock = struct { // At this point, we are in the reader_queue, so we might have already been resumed. - // We set this bit so that later we can rely on the fact, that if reader_queue_empty_bit is 1, + // We set this bit so that later we can rely on the fact, that if reader_queue_empty == true, // some actor will attempt to grab the lock. - @atomicStore(u8, &self.reader_queue_empty_bit, 0, .SeqCst); + @atomicStore(bool, &self.reader_queue_empty, false, .SeqCst); // Here we don't care if we are the one to do the locking or if it was already locked for reading. const have_read_lock = if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == .ReadLock else true; @@ -142,9 +142,9 @@ pub const RwLock = struct { // At this point, we are in the writer_queue, so we might have already been resumed. - // We set this bit so that later we can rely on the fact, that if writer_queue_empty_bit is 1, + // We set this bit so that later we can rely on the fact, that if writer_queue_empty == true, // some actor will attempt to grab the lock. - @atomicStore(u8, &self.writer_queue_empty_bit, 0, .SeqCst); + @atomicStore(bool, &self.writer_queue_empty, false, .SeqCst); // Here we must be the one to acquire the write lock. It cannot already be locked. if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) == null) { @@ -165,7 +165,7 @@ pub const RwLock = struct { // obtain the lock. // But if there's a writer_queue item or a reader_queue item, // we are the actor which must loop and attempt to grab the lock again. - if (@atomicLoad(u8, &self.writer_queue_empty_bit, .SeqCst) == 0) { + if (!@atomicLoad(bool, &self.writer_queue_empty, .SeqCst)) { if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) != null) { // We did not obtain the lock. Great, the queues are someone else's problem. return; @@ -176,12 +176,12 @@ pub const RwLock = struct { return; } // Release the lock again. - @atomicStore(u8, &self.writer_queue_empty_bit, 1, .SeqCst); + @atomicStore(bool, &self.writer_queue_empty, true, .SeqCst); @atomicStore(State, &self.shared_state, .Unlocked, .SeqCst); continue; } - if (@atomicLoad(u8, &self.reader_queue_empty_bit, .SeqCst) == 0) { + if (!@atomicLoad(bool, &self.reader_queue_empty, .SeqCst)) { if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst) != null) { // We did not obtain the lock. Great, the queues are someone else's problem. return; @@ -195,7 +195,7 @@ pub const RwLock = struct { return; } // Release the lock again. - @atomicStore(u8, &self.reader_queue_empty_bit, 1, .SeqCst); + @atomicStore(bool, &self.reader_queue_empty, true, .SeqCst); if (@cmpxchgStrong(State, &self.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) { // Didn't unlock. Someone else's problem. return; |
