aboutsummaryrefslogtreecommitdiff
path: root/std
diff options
context:
space:
mode:
authorAndrew Kelley <superjoe30@gmail.com>2018-07-10 15:17:01 -0400
committerAndrew Kelley <superjoe30@gmail.com>2018-07-10 15:17:01 -0400
commit8fba0a6ae862993afa2aeca774347adc399b3605 (patch)
tree1f0edaca796666c73ab578b2bb3d1c1d4bc5f264 /std
parent0ce6934e2631eb3beca817d3bce12ecb13aafa13 (diff)
downloadzig-8fba0a6ae862993afa2aeca774347adc399b3605.tar.gz
zig-8fba0a6ae862993afa2aeca774347adc399b3605.zip
introduce std.event.Group for making parallel async calls
Diffstat (limited to 'std')
-rw-r--r--std/event.zig2
-rw-r--r--std/event/group.zig158
2 files changed, 160 insertions, 0 deletions
diff --git a/std/event.zig b/std/event.zig
index 7e9928b3d7..516defebf8 100644
--- a/std/event.zig
+++ b/std/event.zig
@@ -3,6 +3,7 @@ pub const Loop = @import("event/loop.zig").Loop;
pub const Lock = @import("event/lock.zig").Lock;
pub const tcp = @import("event/tcp.zig");
pub const Channel = @import("event/channel.zig").Channel;
+pub const Group = @import("event/group.zig").Group;
test "import event tests" {
_ = @import("event/locked.zig");
@@ -10,4 +11,5 @@ test "import event tests" {
_ = @import("event/lock.zig");
_ = @import("event/tcp.zig");
_ = @import("event/channel.zig");
+ _ = @import("event/group.zig");
}
diff --git a/std/event/group.zig b/std/event/group.zig
new file mode 100644
index 0000000000..c286803b53
--- /dev/null
+++ b/std/event/group.zig
@@ -0,0 +1,158 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const assert = std.debug.assert;
+
+/// ReturnType should be `void` or `E!void`
+pub fn Group(comptime ReturnType: type) type {
+ return struct {
+ coro_stack: Stack,
+ alloc_stack: Stack,
+ lock: Lock,
+
+ const Self = this;
+
+ const Error = switch (@typeInfo(ReturnType)) {
+ builtin.TypeId.ErrorUnion => |payload| payload.error_set,
+ else => void,
+ };
+ const Stack = std.atomic.Stack(promise->ReturnType);
+
+ pub fn init(loop: *Loop) Self {
+ return Self{
+ .coro_stack = Stack.init(),
+ .alloc_stack = Stack.init(),
+ .lock = Lock.init(loop),
+ };
+ }
+
+ /// Add a promise to the group. Thread-safe.
+ pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) {
+ const node = try self.lock.loop.allocator.create(Stack.Node{
+ .next = undefined,
+ .data = handle,
+ });
+ self.alloc_stack.push(node);
+ }
+
+ /// This is equivalent to an async call, but the async function is added to the group, instead
+ /// of returning a promise. func must be async and have return type void.
+ /// Thread-safe.
+ pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) {
+ const S = struct {
+ async fn asyncFunc(node: **Stack.Node, args2: ...) ReturnType {
+ // TODO this is a hack to make the memory following be inside the coro frame
+ suspend |p| {
+ var my_node: Stack.Node = undefined;
+ node.* = &my_node;
+ resume p;
+ }
+
+ // TODO this allocation elision should be guaranteed because we await it in
+ // this coro frame
+ return await (async func(args2) catch unreachable);
+ }
+ };
+ var node: *Stack.Node = undefined;
+ const handle = try async<self.lock.loop.allocator> S.asyncFunc(&node, args);
+ node.* = Stack.Node{
+ .next = undefined,
+ .data = handle,
+ };
+ self.coro_stack.push(node);
+ }
+
+ /// Wait for all the calls and promises of the group to complete.
+ /// Thread-safe.
+ pub async fn wait(self: *Self) ReturnType {
+ // TODO catch unreachable because the allocation can be grouped with
+ // the coro frame allocation
+ const held = await (async self.lock.acquire() catch unreachable);
+ defer held.release();
+
+ while (self.coro_stack.pop()) |node| {
+ if (Error == void) {
+ await node.data;
+ } else {
+ (await node.data) catch |err| {
+ self.cancelAll();
+ return err;
+ };
+ }
+ }
+ while (self.alloc_stack.pop()) |node| {
+ const handle = node.data;
+ self.lock.loop.allocator.destroy(node);
+ if (Error == void) {
+ await handle;
+ } else {
+ (await handle) catch |err| {
+ self.cancelAll();
+ return err;
+ };
+ }
+ }
+ }
+
+ /// Cancel all the outstanding promises. May only be called if wait was never called.
+ pub fn cancelAll(self: *Self) void {
+ while (self.coro_stack.pop()) |node| {
+ cancel node.data;
+ }
+ while (self.alloc_stack.pop()) |node| {
+ cancel node.data;
+ self.lock.loop.allocator.destroy(node);
+ }
+ }
+ };
+}
+
+test "std.event.Group" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ const handle = try async<allocator> testGroup(&loop);
+ defer cancel handle;
+
+ loop.run();
+}
+
+async fn testGroup(loop: *Loop) void {
+ var count: usize = 0;
+ var group = Group(void).init(loop);
+ group.add(async sleepALittle(&count) catch @panic("memory")) catch @panic("memory");
+ group.call(increaseByTen, &count) catch @panic("memory");
+ await (async group.wait() catch @panic("memory"));
+ assert(count == 11);
+
+ var another = Group(error!void).init(loop);
+ another.add(async somethingElse() catch @panic("memory")) catch @panic("memory");
+ another.call(doSomethingThatFails) catch @panic("memory");
+ std.debug.assertError(await (async another.wait() catch @panic("memory")), error.ItBroke);
+}
+
+async fn sleepALittle(count: *usize) void {
+ std.os.time.sleep(0, 1000000);
+ _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+}
+
+async fn increaseByTen(count: *usize) void {
+ var i: usize = 0;
+ while (i < 10) : (i += 1) {
+ _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ }
+}
+
+async fn doSomethingThatFails() error!void {}
+async fn somethingElse() error!void {
+ return error.ItBroke;
+}