aboutsummaryrefslogtreecommitdiff
path: root/std/event
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-07-14 18:27:51 -0400
committerAndrew Kelley <superjoe30@gmail.com>2018-07-14 18:27:51 -0400
commit4d920cee6e8be2f2ae2cfd9067358c65b977568a (patch)
tree2c04de6151b7448dec9958d0a91234ea0ba9a15d /std/event
parentda3acacc14331a6be33445c3bfd204e2cccabddd (diff)
parent28c3d4809bc6d497ac81892bc7eb03b95d8c2b32 (diff)
downloadzig-4d920cee6e8be2f2ae2cfd9067358c65b977568a.tar.gz
zig-4d920cee6e8be2f2ae2cfd9067358c65b977568a.zip
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std/event')
-rw-r--r--std/event/channel.zig12
-rw-r--r--std/event/future.zig97
-rw-r--r--std/event/group.zig158
-rw-r--r--std/event/lock.zig13
-rw-r--r--std/event/loop.zig22
5 files changed, 290 insertions, 12 deletions
diff --git a/std/event/channel.zig b/std/event/channel.zig
index 4b3a7177a2..d4d713bdee 100644
--- a/std/event/channel.zig
+++ b/std/event/channel.zig
@@ -12,8 +12,8 @@ pub fn Channel(comptime T: type) type {
return struct {
loop: *Loop,
- getters: std.atomic.QueueMpsc(GetNode),
- putters: std.atomic.QueueMpsc(PutNode),
+ getters: std.atomic.Queue(GetNode),
+ putters: std.atomic.Queue(PutNode),
get_count: usize,
put_count: usize,
dispatch_lock: u8, // TODO make this a bool
@@ -46,8 +46,8 @@ pub fn Channel(comptime T: type) type {
.buffer_index = 0,
.dispatch_lock = 0,
.need_dispatch = 0,
- .getters = std.atomic.QueueMpsc(GetNode).init(),
- .putters = std.atomic.QueueMpsc(PutNode).init(),
+ .getters = std.atomic.Queue(GetNode).init(),
+ .putters = std.atomic.Queue(PutNode).init(),
.get_count = 0,
.put_count = 0,
});
@@ -81,7 +81,7 @@ pub fn Channel(comptime T: type) type {
.next = undefined,
.data = handle,
};
- var queue_node = std.atomic.QueueMpsc(PutNode).Node{
+ var queue_node = std.atomic.Queue(PutNode).Node{
.data = PutNode{
.tick_node = &my_tick_node,
.data = data,
@@ -111,7 +111,7 @@ pub fn Channel(comptime T: type) type {
.next = undefined,
.data = handle,
};
- var queue_node = std.atomic.QueueMpsc(GetNode).Node{
+ var queue_node = std.atomic.Queue(GetNode).Node{
.data = GetNode{
.ptr = &result,
.tick_node = &my_tick_node,
diff --git a/std/event/future.zig b/std/event/future.zig
new file mode 100644
index 0000000000..23fa570c8f
--- /dev/null
+++ b/std/event/future.zig
@@ -0,0 +1,97 @@
+const std = @import("../index.zig");
+const assert = std.debug.assert;
+const builtin = @import("builtin");
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+
+/// This is a value that starts out unavailable, until a value is put().
+/// While it is unavailable, coroutines suspend when they try to get() it,
+/// and then are resumed when the value is put().
+/// At this point the value remains forever available, and another put() is not allowed.
+pub fn Future(comptime T: type) type {
+ return struct {
+ lock: Lock,
+ data: T,
+ available: u8, // TODO make this a bool
+
+ const Self = this;
+ const Queue = std.atomic.Queue(promise);
+
+ pub fn init(loop: *Loop) Self {
+ return Self{
+ .lock = Lock.initLocked(loop),
+ .available = 0,
+ .data = undefined,
+ };
+ }
+
+ /// Obtain the value. If it's not available, wait until it becomes
+ /// available.
+ /// Thread-safe.
+ pub async fn get(self: *Self) *T {
+ if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 1) {
+ return &self.data;
+ }
+ const held = await (async self.lock.acquire() catch unreachable);
+ held.release();
+
+ return &self.data;
+ }
+
+ /// Make the data become available. May be called only once.
+ /// Before calling this, modify the `data` property.
+ pub fn resolve(self: *Self) void {
+ const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ assert(prev == 0); // put() called twice
+ Lock.Held.release(Lock.Held{ .lock = &self.lock });
+ }
+ };
+}
+
+test "std.event.Future" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ const handle = try async<allocator> testFuture(&loop);
+ defer cancel handle;
+
+ loop.run();
+}
+
+async fn testFuture(loop: *Loop) void {
+ suspend |p| {
+ resume p;
+ }
+ var future = Future(i32).init(loop);
+
+ const a = async waitOnFuture(&future) catch @panic("memory");
+ const b = async waitOnFuture(&future) catch @panic("memory");
+ const c = async resolveFuture(&future) catch @panic("memory");
+
+ const result = (await a) + (await b);
+ cancel c;
+ assert(result == 12);
+}
+
+async fn waitOnFuture(future: *Future(i32)) i32 {
+ suspend |p| {
+ resume p;
+ }
+ return (await (async future.get() catch @panic("memory"))).*;
+}
+
+async fn resolveFuture(future: *Future(i32)) void {
+ suspend |p| {
+ resume p;
+ }
+ future.data = 6;
+ future.resolve();
+}
diff --git a/std/event/group.zig b/std/event/group.zig
new file mode 100644
index 0000000000..c286803b53
--- /dev/null
+++ b/std/event/group.zig
@@ -0,0 +1,158 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const assert = std.debug.assert;
+
+/// ReturnType should be `void` or `E!void`
+pub fn Group(comptime ReturnType: type) type {
+ return struct {
+ coro_stack: Stack,
+ alloc_stack: Stack,
+ lock: Lock,
+
+ const Self = this;
+
+ const Error = switch (@typeInfo(ReturnType)) {
+ builtin.TypeId.ErrorUnion => |payload| payload.error_set,
+ else => void,
+ };
+ const Stack = std.atomic.Stack(promise->ReturnType);
+
+ pub fn init(loop: *Loop) Self {
+ return Self{
+ .coro_stack = Stack.init(),
+ .alloc_stack = Stack.init(),
+ .lock = Lock.init(loop),
+ };
+ }
+
+ /// Add a promise to the group. Thread-safe.
+ pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) {
+ const node = try self.lock.loop.allocator.create(Stack.Node{
+ .next = undefined,
+ .data = handle,
+ });
+ self.alloc_stack.push(node);
+ }
+
+ /// This is equivalent to an async call, but the async function is added to the group, instead
+ /// of returning a promise. func must be async and have return type void.
+ /// Thread-safe.
+ pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) {
+ const S = struct {
+ async fn asyncFunc(node: **Stack.Node, args2: ...) ReturnType {
+ // TODO this is a hack to make the memory following be inside the coro frame
+ suspend |p| {
+ var my_node: Stack.Node = undefined;
+ node.* = &my_node;
+ resume p;
+ }
+
+ // TODO this allocation elision should be guaranteed because we await it in
+ // this coro frame
+ return await (async func(args2) catch unreachable);
+ }
+ };
+ var node: *Stack.Node = undefined;
+ const handle = try async<self.lock.loop.allocator> S.asyncFunc(&node, args);
+ node.* = Stack.Node{
+ .next = undefined,
+ .data = handle,
+ };
+ self.coro_stack.push(node);
+ }
+
+ /// Wait for all the calls and promises of the group to complete.
+ /// Thread-safe.
+ pub async fn wait(self: *Self) ReturnType {
+ // TODO catch unreachable because the allocation can be grouped with
+ // the coro frame allocation
+ const held = await (async self.lock.acquire() catch unreachable);
+ defer held.release();
+
+ while (self.coro_stack.pop()) |node| {
+ if (Error == void) {
+ await node.data;
+ } else {
+ (await node.data) catch |err| {
+ self.cancelAll();
+ return err;
+ };
+ }
+ }
+ while (self.alloc_stack.pop()) |node| {
+ const handle = node.data;
+ self.lock.loop.allocator.destroy(node);
+ if (Error == void) {
+ await handle;
+ } else {
+ (await handle) catch |err| {
+ self.cancelAll();
+ return err;
+ };
+ }
+ }
+ }
+
+ /// Cancel all the outstanding promises. May only be called if wait was never called.
+ pub fn cancelAll(self: *Self) void {
+ while (self.coro_stack.pop()) |node| {
+ cancel node.data;
+ }
+ while (self.alloc_stack.pop()) |node| {
+ cancel node.data;
+ self.lock.loop.allocator.destroy(node);
+ }
+ }
+ };
+}
+
+test "std.event.Group" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ const handle = try async<allocator> testGroup(&loop);
+ defer cancel handle;
+
+ loop.run();
+}
+
+async fn testGroup(loop: *Loop) void {
+ var count: usize = 0;
+ var group = Group(void).init(loop);
+ group.add(async sleepALittle(&count) catch @panic("memory")) catch @panic("memory");
+ group.call(increaseByTen, &count) catch @panic("memory");
+ await (async group.wait() catch @panic("memory"));
+ assert(count == 11);
+
+ var another = Group(error!void).init(loop);
+ another.add(async somethingElse() catch @panic("memory")) catch @panic("memory");
+ another.call(doSomethingThatFails) catch @panic("memory");
+ std.debug.assertError(await (async another.wait() catch @panic("memory")), error.ItBroke);
+}
+
+async fn sleepALittle(count: *usize) void {
+ std.os.time.sleep(0, 1000000);
+ _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+}
+
+async fn increaseByTen(count: *usize) void {
+ var i: usize = 0;
+ while (i < 10) : (i += 1) {
+ _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ }
+}
+
+async fn doSomethingThatFails() error!void {}
+async fn somethingElse() error!void {
+ return error.ItBroke;
+}
diff --git a/std/event/lock.zig b/std/event/lock.zig
index 2a8d5ada77..2013b5595f 100644
--- a/std/event/lock.zig
+++ b/std/event/lock.zig
@@ -15,7 +15,7 @@ pub const Lock = struct {
queue: Queue,
queue_empty_bit: u8, // TODO make this a bool
- const Queue = std.atomic.QueueMpsc(promise);
+ const Queue = std.atomic.Queue(promise);
pub const Held = struct {
lock: *Lock,
@@ -73,6 +73,15 @@ pub const Lock = struct {
};
}
+ pub fn initLocked(loop: *Loop) Lock {
+ return Lock{
+ .loop = loop,
+ .shared_bit = 1,
+ .queue = Queue.init(),
+ .queue_empty_bit = 1,
+ };
+ }
+
/// Must be called when not locked. Not thread safe.
/// All calls to acquire() and release() must complete before calling deinit().
pub fn deinit(self: *Lock) void {
@@ -81,7 +90,7 @@ pub const Lock = struct {
}
pub async fn acquire(self: *Lock) Held {
- s: suspend |handle| {
+ suspend |handle| {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = handle,
diff --git a/std/event/loop.zig b/std/event/loop.zig
index 646f15875f..fc927592b9 100644
--- a/std/event/loop.zig
+++ b/std/event/loop.zig
@@ -9,7 +9,7 @@ const AtomicOrder = builtin.AtomicOrder;
pub const Loop = struct {
allocator: *mem.Allocator,
- next_tick_queue: std.atomic.QueueMpsc(promise),
+ next_tick_queue: std.atomic.Queue(promise),
os_data: OsData,
final_resume_node: ResumeNode,
dispatch_lock: u8, // TODO make this a bool
@@ -21,7 +21,7 @@ pub const Loop = struct {
available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
- pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
+ pub const NextTickNode = std.atomic.Queue(promise).Node;
pub const ResumeNode = struct {
id: Id,
@@ -77,7 +77,7 @@ pub const Loop = struct {
.pending_event_count = 0,
.allocator = allocator,
.os_data = undefined,
- .next_tick_queue = std.atomic.QueueMpsc(promise).init(),
+ .next_tick_queue = std.atomic.Queue(promise).init(),
.dispatch_lock = 1, // start locked so threads go directly into epoll wait
.extra_threads = undefined,
.available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
@@ -101,7 +101,6 @@ pub const Loop = struct {
errdefer self.deinitOsData();
}
- /// must call stop before deinit
pub fn deinit(self: *Loop) void {
self.deinitOsData();
self.allocator.free(self.extra_threads);
@@ -382,6 +381,21 @@ pub const Loop = struct {
return async<self.allocator> S.asyncFunc(self, &handle, args);
}
+ /// Awaiting a yield lets the event loop run, starting any unstarted async operations.
+ /// Note that async operations automatically start when a function yields for any other reason,
+ /// for example, when async I/O is performed. This function is intended to be used only when
+ /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O
+ /// is performed.
+ pub async fn yield(self: *Loop) void {
+ suspend |p| {
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = p,
+ };
+ loop.onNextTick(&my_tick_node);
+ }
+ }
+
fn workerRun(self: *Loop) void {
start_over: while (true) {
if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) {