aboutsummaryrefslogtreecommitdiff
path: root/std/event
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-07-24 00:43:12 -0400
committerAndrew Kelley <superjoe30@gmail.com>2018-07-24 00:43:12 -0400
commitdd9728c5a03844267bc378c326c353fd2b0e084e (patch)
tree5786bd228312976ee482a58463a798bc426d64af /std/event
parent558b0b87913dfb6e6b76f5dbe2c36b920302faab (diff)
parent10bdf73a02c90dc375985e49b08b5020cfc20b93 (diff)
downloadzig-dd9728c5a03844267bc378c326c353fd2b0e084e.tar.gz
zig-dd9728c5a03844267bc378c326c353fd2b0e084e.zip
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std/event')
-rw-r--r--std/event/future.zig39
-rw-r--r--std/event/group.zig16
-rw-r--r--std/event/loop.zig209
-rw-r--r--std/event/tcp.zig3
4 files changed, 147 insertions, 120 deletions
diff --git a/std/event/future.zig b/std/event/future.zig
index 0f27b4131b..f5d14d1ca6 100644
--- a/std/event/future.zig
+++ b/std/event/future.zig
@@ -6,15 +6,20 @@ const AtomicOrder = builtin.AtomicOrder;
const Lock = std.event.Lock;
const Loop = std.event.Loop;
-/// This is a value that starts out unavailable, until a value is put().
+/// This is a value that starts out unavailable, until resolve() is called
/// While it is unavailable, coroutines suspend when they try to get() it,
-/// and then are resumed when the value is put().
-/// At this point the value remains forever available, and another put() is not allowed.
+/// and then are resumed when resolve() is called.
+/// At this point the value remains forever available, and another resolve() is not allowed.
pub fn Future(comptime T: type) type {
return struct {
lock: Lock,
data: T,
- available: u8, // TODO make this a bool
+
+ /// TODO make this an enum
+ /// 0 - not started
+ /// 1 - started
+ /// 2 - finished
+ available: u8,
const Self = this;
const Queue = std.atomic.Queue(promise);
@@ -31,7 +36,7 @@ pub fn Future(comptime T: type) type {
/// available.
/// Thread-safe.
pub async fn get(self: *Self) *T {
- if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 1) {
+ if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 2) {
return &self.data;
}
const held = await (async self.lock.acquire() catch unreachable);
@@ -43,18 +48,36 @@ pub fn Future(comptime T: type) type {
/// Gets the data without waiting for it. If it's available, a pointer is
/// returned. Otherwise, null is returned.
pub fn getOrNull(self: *Self) ?*T {
- if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 1) {
+ if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 2) {
return &self.data;
} else {
return null;
}
}
+ /// If someone else has started working on the data, wait for them to complete
+ /// and return a pointer to the data. Otherwise, return null, and the caller
+ /// should start working on the data.
+ /// It's not required to call start() before resolve() but it can be useful since
+ /// this method is thread-safe.
+ pub async fn start(self: *Self) ?*T {
+ const state = @cmpxchgStrong(u8, &self.available, 0, 1, AtomicOrder.SeqCst, AtomicOrder.SeqCst) orelse return null;
+ switch (state) {
+ 1 => {
+ const held = await (async self.lock.acquire() catch unreachable);
+ held.release();
+ return &self.data;
+ },
+ 2 => return &self.data,
+ else => unreachable,
+ }
+ }
+
/// Make the data become available. May be called only once.
/// Before calling this, modify the `data` property.
pub fn resolve(self: *Self) void {
- const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
- assert(prev == 0); // put() called twice
+ const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 2, AtomicOrder.SeqCst);
+ assert(prev == 0 or prev == 1); // resolve() called twice
Lock.Held.release(Lock.Held{ .lock = &self.lock });
}
};
diff --git a/std/event/group.zig b/std/event/group.zig
index c286803b53..26c098399e 100644
--- a/std/event/group.zig
+++ b/std/event/group.zig
@@ -6,7 +6,7 @@ const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const assert = std.debug.assert;
-/// ReturnType should be `void` or `E!void`
+/// ReturnType must be `void` or `E!void`
pub fn Group(comptime ReturnType: type) type {
return struct {
coro_stack: Stack,
@@ -38,8 +38,17 @@ pub fn Group(comptime ReturnType: type) type {
self.alloc_stack.push(node);
}
+ /// Add a node to the group. Thread-safe. Cannot fail.
+ /// `node.data` should be the promise handle to add to the group.
+ /// The node's memory should be in the coroutine frame of
+ /// the handle that is in the node, or somewhere guaranteed to live
+ /// at least as long.
+ pub fn addNode(self: *Self, node: *Stack.Node) void {
+ self.coro_stack.push(node);
+ }
+
/// This is equivalent to an async call, but the async function is added to the group, instead
- /// of returning a promise. func must be async and have return type void.
+ /// of returning a promise. func must be async and have return type ReturnType.
/// Thread-safe.
pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) {
const S = struct {
@@ -67,6 +76,7 @@ pub fn Group(comptime ReturnType: type) type {
/// Wait for all the calls and promises of the group to complete.
/// Thread-safe.
+ /// Safe to call any number of times.
pub async fn wait(self: *Self) ReturnType {
// TODO catch unreachable because the allocation can be grouped with
// the coro frame allocation
@@ -98,6 +108,8 @@ pub fn Group(comptime ReturnType: type) type {
}
/// Cancel all the outstanding promises. May only be called if wait was never called.
+ /// TODO These should be `cancelasync` not `cancel`.
+ /// See https://github.com/ziglang/zig/issues/1261
pub fn cancelAll(self: *Self) void {
while (self.coro_stack.pop()) |node| {
cancel node.data;
diff --git a/std/event/loop.zig b/std/event/loop.zig
index fc927592b9..cd805f891f 100644
--- a/std/event/loop.zig
+++ b/std/event/loop.zig
@@ -12,7 +12,6 @@ pub const Loop = struct {
next_tick_queue: std.atomic.Queue(promise),
os_data: OsData,
final_resume_node: ResumeNode,
- dispatch_lock: u8, // TODO make this a bool
pending_event_count: usize,
extra_threads: []*std.os.Thread,
@@ -74,11 +73,10 @@ pub const Loop = struct {
/// max(thread_count - 1, 0)
fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
self.* = Loop{
- .pending_event_count = 0,
+ .pending_event_count = 1,
.allocator = allocator,
.os_data = undefined,
.next_tick_queue = std.atomic.Queue(promise).init(),
- .dispatch_lock = 1, // start locked so threads go directly into epoll wait
.extra_threads = undefined,
.available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
.eventfd_resume_nodes = undefined,
@@ -235,8 +233,6 @@ pub const Loop = struct {
}
},
builtin.Os.windows => {
- self.os_data.extra_thread_count = extra_thread_count;
-
self.os_data.io_port = try std.os.windowsCreateIoCompletionPort(
windows.INVALID_HANDLE_VALUE,
null,
@@ -306,7 +302,7 @@ pub const Loop = struct {
pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
errdefer {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ self.finishOneEvent();
}
try self.modFd(
fd,
@@ -326,7 +322,7 @@ pub const Loop = struct {
pub fn removeFd(self: *Loop, fd: i32) void {
self.removeFdNoCounter(fd);
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ self.finishOneEvent();
}
fn removeFdNoCounter(self: *Loop, fd: i32) void {
@@ -345,14 +341,70 @@ pub const Loop = struct {
}
}
+ fn dispatch(self: *Loop) void {
+ while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
+ const next_tick_node = self.next_tick_queue.get() orelse {
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ return;
+ };
+ const eventfd_node = &resume_stack_node.data;
+ eventfd_node.base.handle = next_tick_node.data;
+ switch (builtin.os) {
+ builtin.Os.macosx => {
+ const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+ _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
+ self.next_tick_queue.unget(next_tick_node);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ return;
+ };
+ },
+ builtin.Os.linux => {
+ // the pending count is already accounted for
+ const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT |
+ std.os.linux.EPOLLET;
+ self.modFd(
+ eventfd_node.eventfd,
+ eventfd_node.epoll_op,
+ epoll_events,
+ &eventfd_node.base,
+ ) catch {
+ self.next_tick_queue.unget(next_tick_node);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ return;
+ };
+ },
+ builtin.Os.windows => {
+ // this value is never dereferenced but we need it to be non-null so that
+ // the consumer code can decide whether to read the completion key.
+ // it has to do this for normal I/O, so we match that behavior here.
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(
+ self.os_data.io_port,
+ undefined,
+ eventfd_node.completion_key,
+ overlapped,
+ ) catch {
+ self.next_tick_queue.unget(next_tick_node);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ return;
+ };
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+ }
+
/// Bring your own linked list node. This means it can't fail.
pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.next_tick_queue.put(node);
+ self.dispatch();
}
pub fn run(self: *Loop) void {
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.finishOneEvent(); // the reference we start with
+
self.workerRun();
for (self.extra_threads) |extra_thread| {
extra_thread.wait();
@@ -392,110 +444,49 @@ pub const Loop = struct {
.next = undefined,
.data = p,
};
- loop.onNextTick(&my_tick_node);
+ self.onNextTick(&my_tick_node);
}
}
- fn workerRun(self: *Loop) void {
- start_over: while (true) {
- if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) {
- while (self.next_tick_queue.get()) |next_tick_node| {
- const handle = next_tick_node.data;
- if (self.next_tick_queue.isEmpty()) {
- // last node, just resume it
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- }
-
- // non-last node, stick it in the epoll/kqueue set so that
- // other threads can get to it
- if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
- const eventfd_node = &resume_stack_node.data;
- eventfd_node.base.handle = handle;
- switch (builtin.os) {
- builtin.Os.macosx => {
- const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
- const eventlist = ([*]posix.Kevent)(undefined)[0..0];
- _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
- // fine, we didn't need it anyway
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- };
- },
- builtin.Os.linux => {
- // the pending count is already accounted for
- const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
- self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch {
- // fine, we didn't need it anyway
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- };
- },
- builtin.Os.windows => {
- // this value is never dereferenced but we need it to be non-null so that
- // the consumer code can decide whether to read the completion key.
- // it has to do this for normal I/O, so we match that behavior here.
- const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
- std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch {
- // fine, we didn't need it anyway
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- };
- },
- else => @compileError("unsupported OS"),
+ fn finishOneEvent(self: *Loop) void {
+ if (@atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst) == 1) {
+ // cause all the threads to stop
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ // writing 8 bytes to an eventfd cannot fail
+ std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ return;
+ },
+ builtin.Os.macosx => {
+ const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+ // cannot fail because we already added it and this just enables it
+ _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
+ return;
+ },
+ builtin.Os.windows => {
+ var i: usize = 0;
+ while (i < self.extra_threads.len + 1) : (i += 1) {
+ while (true) {
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+ break;
}
- } else {
- // threads are too busy, can't add another eventfd to wake one up
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
}
- }
-
- const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst);
- if (pending_event_count == 0) {
- // cause all the threads to stop
- switch (builtin.os) {
- builtin.Os.linux => {
- // writing 8 bytes to an eventfd cannot fail
- std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
- return;
- },
- builtin.Os.macosx => {
- const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
- const eventlist = ([*]posix.Kevent)(undefined)[0..0];
- // cannot fail because we already added it and this just enables it
- _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
- return;
- },
- builtin.Os.windows => {
- var i: usize = 0;
- while (i < self.os_data.extra_thread_count) : (i += 1) {
- while (true) {
- const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
- std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
- break;
- }
- }
- return;
- },
- else => @compileError("unsupported OS"),
- }
- }
+ return;
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+ }
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ fn workerRun(self: *Loop) void {
+ while (true) {
+ while (true) {
+ const next_tick_node = self.next_tick_queue.get() orelse break;
+ self.dispatch();
+ resume next_tick_node.data;
+ self.finishOneEvent();
}
switch (builtin.os) {
@@ -519,7 +510,7 @@ pub const Loop = struct {
}
resume handle;
if (resume_node_id == ResumeNode.Id.EventFd) {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ self.finishOneEvent();
}
}
},
@@ -541,7 +532,7 @@ pub const Loop = struct {
}
resume handle;
if (resume_node_id == ResumeNode.Id.EventFd) {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ self.finishOneEvent();
}
}
},
@@ -570,7 +561,7 @@ pub const Loop = struct {
}
resume handle;
if (resume_node_id == ResumeNode.Id.EventFd) {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ self.finishOneEvent();
}
},
else => @compileError("unsupported OS"),
diff --git a/std/event/tcp.zig b/std/event/tcp.zig
index 5151ecf934..416a8c07dc 100644
--- a/std/event/tcp.zig
+++ b/std/event/tcp.zig
@@ -125,8 +125,9 @@ pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File
test "listen on a port, send bytes, receive bytes" {
if (builtin.os != builtin.Os.linux) {
// TODO build abstractions for other operating systems
- return;
+ return error.SkipZigTest;
}
+
const MyServer = struct {
tcp_server: Server,