aboutsummaryrefslogtreecommitdiff
path: root/lib/std/event
diff options
context:
space:
mode:
authorAndrew Kelley <andrew@ziglang.org>2020-03-13 15:17:53 -0400
committerAndrew Kelley <andrew@ziglang.org>2020-03-13 15:17:53 -0400
commit656ba530d80e67bc7bb9c40e5c2db26a40743a15 (patch)
tree767f4d57000922cf122ae965dc825f87c62ec64e /lib/std/event
parent96c07674fc2293fa040212ab797c05436dc515b1 (diff)
parent3eff77bfb52accbc16eb831753ff4917fc2b4873 (diff)
downloadzig-656ba530d80e67bc7bb9c40e5c2db26a40743a15.tar.gz
zig-656ba530d80e67bc7bb9c40e5c2db26a40743a15.zip
Merge remote-tracking branch 'origin/master' into llvm10
Diffstat (limited to 'lib/std/event')
-rw-r--r--lib/std/event/channel.zig23
-rw-r--r--lib/std/event/group.zig4
-rw-r--r--lib/std/event/lock.zig39
-rw-r--r--lib/std/event/loop.zig78
-rw-r--r--lib/std/event/rwlock.zig32
5 files changed, 127 insertions, 49 deletions
diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig
index 3c5b48d047..83c77bcac5 100644
--- a/lib/std/event/channel.zig
+++ b/lib/std/event/channel.zig
@@ -14,8 +14,8 @@ pub fn Channel(comptime T: type) type {
putters: std.atomic.Queue(PutNode),
get_count: usize,
put_count: usize,
- dispatch_lock: u8, // TODO make this a bool
- need_dispatch: u8, // TODO make this a bool
+ dispatch_lock: bool,
+ need_dispatch: bool,
// simple fixed size ring buffer
buffer_nodes: []T,
@@ -62,8 +62,8 @@ pub fn Channel(comptime T: type) type {
.buffer_len = 0,
.buffer_nodes = buffer,
.buffer_index = 0,
- .dispatch_lock = 0,
- .need_dispatch = 0,
+ .dispatch_lock = false,
+ .need_dispatch = false,
.getters = std.atomic.Queue(GetNode).init(),
.putters = std.atomic.Queue(PutNode).init(),
.or_null_queue = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).init(),
@@ -165,15 +165,14 @@ pub fn Channel(comptime T: type) type {
fn dispatch(self: *SelfChannel) void {
// set the "need dispatch" flag
- @atomicStore(u8, &self.need_dispatch, 1, .SeqCst);
+ @atomicStore(bool, &self.need_dispatch, true, .SeqCst);
lock: while (true) {
// set the lock flag
- const prev_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 1, .SeqCst);
- if (prev_lock != 0) return;
+ if (@atomicRmw(bool, &self.dispatch_lock, .Xchg, true, .SeqCst)) return;
// clear the need_dispatch flag since we're about to do it
- @atomicStore(u8, &self.need_dispatch, 0, .SeqCst);
+ @atomicStore(bool, &self.need_dispatch, false, .SeqCst);
while (true) {
one_dispatch: {
@@ -250,14 +249,12 @@ pub fn Channel(comptime T: type) type {
}
// clear need-dispatch flag
- const need_dispatch = @atomicRmw(u8, &self.need_dispatch, .Xchg, 0, .SeqCst);
- if (need_dispatch != 0) continue;
+ if (@atomicRmw(bool, &self.need_dispatch, .Xchg, false, .SeqCst)) continue;
- const my_lock = @atomicRmw(u8, &self.dispatch_lock, .Xchg, 0, .SeqCst);
- assert(my_lock != 0);
+ assert(@atomicRmw(bool, &self.dispatch_lock, .Xchg, false, .SeqCst));
// we have to check again now that we unlocked
- if (@atomicLoad(u8, &self.need_dispatch, .SeqCst) != 0) continue :lock;
+ if (@atomicLoad(bool, &self.need_dispatch, .SeqCst)) continue :lock;
return;
}
diff --git a/lib/std/event/group.zig b/lib/std/event/group.zig
index ac1bf68245..5eebb7ffbc 100644
--- a/lib/std/event/group.zig
+++ b/lib/std/event/group.zig
@@ -120,9 +120,11 @@ test "std.event.Group" {
// https://github.com/ziglang/zig/issues/1908
if (builtin.single_threaded) return error.SkipZigTest;
- // TODO provide a way to run tests in evented I/O mode
if (!std.io.is_async) return error.SkipZigTest;
+ // TODO this file has bit-rotted. repair it
+ if (true) return error.SkipZigTest;
+
const handle = async testGroup(std.heap.page_allocator);
}
diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig
index b9cbb5d95f..ff1f738c5e 100644
--- a/lib/std/event/lock.zig
+++ b/lib/std/event/lock.zig
@@ -11,9 +11,9 @@ const Loop = std.event.Loop;
/// Allows only one actor to hold the lock.
/// TODO: make this API also work in blocking I/O mode.
pub const Lock = struct {
- shared_bit: u8, // TODO make this a bool
+ shared: bool,
queue: Queue,
- queue_empty_bit: u8, // TODO make this a bool
+ queue_empty: bool,
const Queue = std.atomic.Queue(anyframe);
@@ -31,20 +31,19 @@ pub const Lock = struct {
}
// We need to release the lock.
- @atomicStore(u8, &self.lock.queue_empty_bit, 1, .SeqCst);
- @atomicStore(u8, &self.lock.shared_bit, 0, .SeqCst);
+ @atomicStore(bool, &self.lock.queue_empty, true, .SeqCst);
+ @atomicStore(bool, &self.lock.shared, false, .SeqCst);
// There might be a queue item. If we know the queue is empty, we can be done,
// because the other actor will try to obtain the lock.
// But if there's a queue item, we are the actor which must loop and attempt
// to grab the lock again.
- if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) {
+ if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) {
return;
}
while (true) {
- const old_bit = @atomicRmw(u8, &self.lock.shared_bit, .Xchg, 1, .SeqCst);
- if (old_bit != 0) {
+ if (@atomicRmw(bool, &self.lock.shared, .Xchg, true, .SeqCst)) {
// We did not obtain the lock. Great, the queue is someone else's problem.
return;
}
@@ -56,11 +55,11 @@ pub const Lock = struct {
}
// Release the lock again.
- @atomicStore(u8, &self.lock.queue_empty_bit, 1, .SeqCst);
- @atomicStore(u8, &self.lock.shared_bit, 0, .SeqCst);
+ @atomicStore(bool, &self.lock.queue_empty, true, .SeqCst);
+ @atomicStore(bool, &self.lock.shared, false, .SeqCst);
// Find out if we can be done.
- if (@atomicLoad(u8, &self.lock.queue_empty_bit, .SeqCst) == 1) {
+ if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) {
return;
}
}
@@ -69,24 +68,24 @@ pub const Lock = struct {
pub fn init() Lock {
return Lock{
- .shared_bit = 0,
+ .shared = false,
.queue = Queue.init(),
- .queue_empty_bit = 1,
+ .queue_empty = true,
};
}
pub fn initLocked() Lock {
return Lock{
- .shared_bit = 1,
+ .shared = true,
.queue = Queue.init(),
- .queue_empty_bit = 1,
+ .queue_empty = true,
};
}
/// Must be called when not locked. Not thread safe.
/// All calls to acquire() and release() must complete before calling deinit().
pub fn deinit(self: *Lock) void {
- assert(self.shared_bit == 0);
+ assert(!self.shared);
while (self.queue.get()) |node| resume node.data;
}
@@ -99,12 +98,11 @@ pub const Lock = struct {
// At this point, we are in the queue, so we might have already been resumed.
- // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
+ // We set this bit so that later we can rely on the fact, that if queue_empty == true, some actor
// will attempt to grab the lock.
- @atomicStore(u8, &self.queue_empty_bit, 0, .SeqCst);
+ @atomicStore(bool, &self.queue_empty, false, .SeqCst);
- const old_bit = @atomicRmw(u8, &self.shared_bit, .Xchg, 1, .SeqCst);
- if (old_bit == 0) {
+ if (!@atomicRmw(bool, &self.shared, .Xchg, true, .SeqCst)) {
if (self.queue.get()) |node| {
// Whether this node is us or someone else, we tail resume it.
resume node.data;
@@ -125,6 +123,9 @@ test "std.event.Lock" {
// TODO https://github.com/ziglang/zig/issues/3251
if (builtin.os.tag == .freebsd) return error.SkipZigTest;
+ // TODO this file has bit-rotted. repair it
+ if (true) return error.SkipZigTest;
+
var lock = Lock.init();
defer lock.deinit();
diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig
index e62f15d59a..7db6fe98de 100644
--- a/lib/std/event/loop.zig
+++ b/lib/std/event/loop.zig
@@ -809,6 +809,28 @@ pub const Loop = struct {
return req_node.data.msg.readv.result;
}
+ /// Performs an async `os.pread` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn pread(self: *Loop, fd: os.fd_t, buf: []u8, offset: u64) os.PReadError!usize {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .pread = .{
+ .fd = fd,
+ .buf = buf,
+ .offset = offset,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.pread.result;
+ }
+
/// Performs an async `os.preadv` using a separate thread.
/// `fd` must block and not return EAGAIN.
pub fn preadv(self: *Loop, fd: os.fd_t, iov: []const os.iovec, offset: u64) os.ReadError!usize {
@@ -895,6 +917,35 @@ pub const Loop = struct {
return req_node.data.msg.pwritev.result;
}
+ /// Performs an async `os.faccessatZ` using a separate thread.
+ /// `fd` must block and not return EAGAIN.
+ pub fn faccessatZ(
+ self: *Loop,
+ dirfd: os.fd_t,
+ path_z: [*:0]const u8,
+ mode: u32,
+ flags: u32,
+ ) os.AccessError!void {
+ var req_node = Request.Node{
+ .data = .{
+ .msg = .{
+ .faccessat = .{
+ .dirfd = dirfd,
+ .path = path_z,
+ .mode = mode,
+ .flags = flags,
+ .result = undefined,
+ },
+ },
+ .finish = .{ .TickNode = .{ .data = @frame() } },
+ },
+ };
+ suspend {
+ self.posixFsRequest(&req_node);
+ }
+ return req_node.data.msg.faccessat.result;
+ }
+
fn workerRun(self: *Loop) void {
while (true) {
while (true) {
@@ -1038,6 +1089,9 @@ pub const Loop = struct {
.pwritev => |*msg| {
msg.result = noasync os.pwritev(msg.fd, msg.iov, msg.offset);
},
+ .pread => |*msg| {
+ msg.result = noasync os.pread(msg.fd, msg.buf, msg.offset);
+ },
.preadv => |*msg| {
msg.result = noasync os.preadv(msg.fd, msg.iov, msg.offset);
},
@@ -1047,6 +1101,9 @@ pub const Loop = struct {
.openat => |*msg| {
msg.result = noasync os.openatC(msg.fd, msg.path, msg.flags, msg.mode);
},
+ .faccessat => |*msg| {
+ msg.result = noasync os.faccessatZ(msg.dirfd, msg.path, msg.mode, msg.flags);
+ },
.close => |*msg| noasync os.close(msg.fd),
}
switch (node.data.finish) {
@@ -1120,10 +1177,12 @@ pub const Loop = struct {
write: Write,
writev: WriteV,
pwritev: PWriteV,
+ pread: PRead,
preadv: PReadV,
open: Open,
openat: OpenAt,
close: Close,
+ faccessat: FAccessAt,
/// special - means the fs thread should exit
end,
@@ -1161,6 +1220,15 @@ pub const Loop = struct {
pub const Error = os.PWriteError;
};
+ pub const PRead = struct {
+ fd: os.fd_t,
+ buf: []u8,
+ offset: usize,
+ result: Error!usize,
+
+ pub const Error = os.PReadError;
+ };
+
pub const PReadV = struct {
fd: os.fd_t,
iov: []const os.iovec,
@@ -1192,6 +1260,16 @@ pub const Loop = struct {
pub const Close = struct {
fd: os.fd_t,
};
+
+ pub const FAccessAt = struct {
+ dirfd: os.fd_t,
+ path: [*:0]const u8,
+ mode: u32,
+ flags: u32,
+ result: Error!void,
+
+ pub const Error = os.AccessError;
+ };
};
};
};
diff --git a/lib/std/event/rwlock.zig b/lib/std/event/rwlock.zig
index f4b13d008b..425088063f 100644
--- a/lib/std/event/rwlock.zig
+++ b/lib/std/event/rwlock.zig
@@ -16,8 +16,8 @@ pub const RwLock = struct {
shared_state: State,
writer_queue: Queue,
reader_queue: Queue,
- writer_queue_empty_bit: u8, // TODO make this a bool
- reader_queue_empty_bit: u8, // TODO make this a bool
+ writer_queue_empty: bool,
+ reader_queue_empty: bool,
reader_lock_count: usize,
const State = enum(u8) {
@@ -40,7 +40,7 @@ pub const RwLock = struct {
return;
}
- @atomicStore(u8, &self.lock.reader_queue_empty_bit, 1, .SeqCst);
+ @atomicStore(bool, &self.lock.reader_queue_empty, true, .SeqCst);
if (@cmpxchgStrong(State, &self.lock.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) {
// Didn't unlock. Someone else's problem.
return;
@@ -62,7 +62,7 @@ pub const RwLock = struct {
}
// We need to release the write lock. Check if any readers are waiting to grab the lock.
- if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, .SeqCst) == 0) {
+ if (!@atomicLoad(bool, &self.lock.reader_queue_empty, .SeqCst)) {
// Switch to a read lock.
@atomicStore(State, &self.lock.shared_state, .ReadLock, .SeqCst);
while (self.lock.reader_queue.get()) |node| {
@@ -71,7 +71,7 @@ pub const RwLock = struct {
return;
}
- @atomicStore(u8, &self.lock.writer_queue_empty_bit, 1, .SeqCst);
+ @atomicStore(bool, &self.lock.writer_queue_empty, true, .SeqCst);
@atomicStore(State, &self.lock.shared_state, .Unlocked, .SeqCst);
self.lock.commonPostUnlock();
@@ -79,12 +79,12 @@ pub const RwLock = struct {
};
pub fn init() RwLock {
- return RwLock{
+ return .{
.shared_state = .Unlocked,
.writer_queue = Queue.init(),
- .writer_queue_empty_bit = 1,
+ .writer_queue_empty = true,
.reader_queue = Queue.init(),
- .reader_queue_empty_bit = 1,
+ .reader_queue_empty = true,
.reader_lock_count = 0,
};
}
@@ -111,9 +111,9 @@ pub const RwLock = struct {
// At this point, we are in the reader_queue, so we might have already been resumed.
- // We set this bit so that later we can rely on the fact, that if reader_queue_empty_bit is 1,
+ // We set this bit so that later we can rely on the fact, that if reader_queue_empty == true,
// some actor will attempt to grab the lock.
- @atomicStore(u8, &self.reader_queue_empty_bit, 0, .SeqCst);
+ @atomicStore(bool, &self.reader_queue_empty, false, .SeqCst);
// Here we don't care if we are the one to do the locking or if it was already locked for reading.
const have_read_lock = if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == .ReadLock else true;
@@ -142,9 +142,9 @@ pub const RwLock = struct {
// At this point, we are in the writer_queue, so we might have already been resumed.
- // We set this bit so that later we can rely on the fact, that if writer_queue_empty_bit is 1,
+ // We set this bit so that later we can rely on the fact, that if writer_queue_empty == true,
// some actor will attempt to grab the lock.
- @atomicStore(u8, &self.writer_queue_empty_bit, 0, .SeqCst);
+ @atomicStore(bool, &self.writer_queue_empty, false, .SeqCst);
// Here we must be the one to acquire the write lock. It cannot already be locked.
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) == null) {
@@ -165,7 +165,7 @@ pub const RwLock = struct {
// obtain the lock.
// But if there's a writer_queue item or a reader_queue item,
// we are the actor which must loop and attempt to grab the lock again.
- if (@atomicLoad(u8, &self.writer_queue_empty_bit, .SeqCst) == 0) {
+ if (!@atomicLoad(bool, &self.writer_queue_empty, .SeqCst)) {
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) != null) {
// We did not obtain the lock. Great, the queues are someone else's problem.
return;
@@ -176,12 +176,12 @@ pub const RwLock = struct {
return;
}
// Release the lock again.
- @atomicStore(u8, &self.writer_queue_empty_bit, 1, .SeqCst);
+ @atomicStore(bool, &self.writer_queue_empty, true, .SeqCst);
@atomicStore(State, &self.shared_state, .Unlocked, .SeqCst);
continue;
}
- if (@atomicLoad(u8, &self.reader_queue_empty_bit, .SeqCst) == 0) {
+ if (!@atomicLoad(bool, &self.reader_queue_empty, .SeqCst)) {
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst) != null) {
// We did not obtain the lock. Great, the queues are someone else's problem.
return;
@@ -195,7 +195,7 @@ pub const RwLock = struct {
return;
}
// Release the lock again.
- @atomicStore(u8, &self.reader_queue_empty_bit, 1, .SeqCst);
+ @atomicStore(bool, &self.reader_queue_empty, true, .SeqCst);
if (@cmpxchgStrong(State, &self.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) {
// Didn't unlock. Someone else's problem.
return;