diff options
| author | Andrew Kelley <superjoe30@gmail.com> | 2018-07-14 18:27:51 -0400 |
|---|---|---|
| committer | Andrew Kelley <superjoe30@gmail.com> | 2018-07-14 18:27:51 -0400 |
| commit | 4d920cee6e8be2f2ae2cfd9067358c65b977568a (patch) | |
| tree | 2c04de6151b7448dec9958d0a91234ea0ba9a15d /std/event | |
| parent | da3acacc14331a6be33445c3bfd204e2cccabddd (diff) | |
| parent | 28c3d4809bc6d497ac81892bc7eb03b95d8c2b32 (diff) | |
| download | zig-4d920cee6e8be2f2ae2cfd9067358c65b977568a.tar.gz zig-4d920cee6e8be2f2ae2cfd9067358c65b977568a.zip | |
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std/event')
| -rw-r--r-- | std/event/channel.zig | 12 | ||||
| -rw-r--r-- | std/event/future.zig | 97 | ||||
| -rw-r--r-- | std/event/group.zig | 158 | ||||
| -rw-r--r-- | std/event/lock.zig | 13 | ||||
| -rw-r--r-- | std/event/loop.zig | 22 |
5 files changed, 290 insertions, 12 deletions
diff --git a/std/event/channel.zig b/std/event/channel.zig index 4b3a7177a2..d4d713bdee 100644 --- a/std/event/channel.zig +++ b/std/event/channel.zig @@ -12,8 +12,8 @@ pub fn Channel(comptime T: type) type { return struct { loop: *Loop, - getters: std.atomic.QueueMpsc(GetNode), - putters: std.atomic.QueueMpsc(PutNode), + getters: std.atomic.Queue(GetNode), + putters: std.atomic.Queue(PutNode), get_count: usize, put_count: usize, dispatch_lock: u8, // TODO make this a bool @@ -46,8 +46,8 @@ pub fn Channel(comptime T: type) type { .buffer_index = 0, .dispatch_lock = 0, .need_dispatch = 0, - .getters = std.atomic.QueueMpsc(GetNode).init(), - .putters = std.atomic.QueueMpsc(PutNode).init(), + .getters = std.atomic.Queue(GetNode).init(), + .putters = std.atomic.Queue(PutNode).init(), .get_count = 0, .put_count = 0, }); @@ -81,7 +81,7 @@ pub fn Channel(comptime T: type) type { .next = undefined, .data = handle, }; - var queue_node = std.atomic.QueueMpsc(PutNode).Node{ + var queue_node = std.atomic.Queue(PutNode).Node{ .data = PutNode{ .tick_node = &my_tick_node, .data = data, @@ -111,7 +111,7 @@ pub fn Channel(comptime T: type) type { .next = undefined, .data = handle, }; - var queue_node = std.atomic.QueueMpsc(GetNode).Node{ + var queue_node = std.atomic.Queue(GetNode).Node{ .data = GetNode{ .ptr = &result, .tick_node = &my_tick_node, diff --git a/std/event/future.zig b/std/event/future.zig new file mode 100644 index 0000000000..23fa570c8f --- /dev/null +++ b/std/event/future.zig @@ -0,0 +1,97 @@ +const std = @import("../index.zig"); +const assert = std.debug.assert; +const builtin = @import("builtin"); +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const Lock = std.event.Lock; +const Loop = std.event.Loop; + +/// This is a value that starts out unavailable, until a value is put(). +/// While it is unavailable, coroutines suspend when they try to get() it, +/// and then are resumed when the value is put(). +/// At this point the value remains forever available, and another put() is not allowed. +pub fn Future(comptime T: type) type { + return struct { + lock: Lock, + data: T, + available: u8, // TODO make this a bool + + const Self = this; + const Queue = std.atomic.Queue(promise); + + pub fn init(loop: *Loop) Self { + return Self{ + .lock = Lock.initLocked(loop), + .available = 0, + .data = undefined, + }; + } + + /// Obtain the value. If it's not available, wait until it becomes + /// available. + /// Thread-safe. + pub async fn get(self: *Self) *T { + if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 1) { + return &self.data; + } + const held = await (async self.lock.acquire() catch unreachable); + held.release(); + + return &self.data; + } + + /// Make the data become available. May be called only once. + /// Before calling this, modify the `data` property. + pub fn resolve(self: *Self) void { + const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + assert(prev == 0); // put() called twice + Lock.Held.release(Lock.Held{ .lock = &self.lock }); + } + }; +} + +test "std.event.Future" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + const handle = try async<allocator> testFuture(&loop); + defer cancel handle; + + loop.run(); +} + +async fn testFuture(loop: *Loop) void { + suspend |p| { + resume p; + } + var future = Future(i32).init(loop); + + const a = async waitOnFuture(&future) catch @panic("memory"); + const b = async waitOnFuture(&future) catch @panic("memory"); + const c = async resolveFuture(&future) catch @panic("memory"); + + const result = (await a) + (await b); + cancel c; + assert(result == 12); +} + +async fn waitOnFuture(future: *Future(i32)) i32 { + suspend |p| { + resume p; + } + return (await (async future.get() catch @panic("memory"))).*; +} + +async fn resolveFuture(future: *Future(i32)) void { + suspend |p| { + resume p; + } + future.data = 6; + future.resolve(); +} diff --git a/std/event/group.zig b/std/event/group.zig new file mode 100644 index 0000000000..c286803b53 --- /dev/null +++ b/std/event/group.zig @@ -0,0 +1,158 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const Lock = std.event.Lock; +const Loop = std.event.Loop; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const assert = std.debug.assert; + +/// ReturnType should be `void` or `E!void` +pub fn Group(comptime ReturnType: type) type { + return struct { + coro_stack: Stack, + alloc_stack: Stack, + lock: Lock, + + const Self = this; + + const Error = switch (@typeInfo(ReturnType)) { + builtin.TypeId.ErrorUnion => |payload| payload.error_set, + else => void, + }; + const Stack = std.atomic.Stack(promise->ReturnType); + + pub fn init(loop: *Loop) Self { + return Self{ + .coro_stack = Stack.init(), + .alloc_stack = Stack.init(), + .lock = Lock.init(loop), + }; + } + + /// Add a promise to the group. Thread-safe. + pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) { + const node = try self.lock.loop.allocator.create(Stack.Node{ + .next = undefined, + .data = handle, + }); + self.alloc_stack.push(node); + } + + /// This is equivalent to an async call, but the async function is added to the group, instead + /// of returning a promise. func must be async and have return type void. + /// Thread-safe. + pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) { + const S = struct { + async fn asyncFunc(node: **Stack.Node, args2: ...) ReturnType { + // TODO this is a hack to make the memory following be inside the coro frame + suspend |p| { + var my_node: Stack.Node = undefined; + node.* = &my_node; + resume p; + } + + // TODO this allocation elision should be guaranteed because we await it in + // this coro frame + return await (async func(args2) catch unreachable); + } + }; + var node: *Stack.Node = undefined; + const handle = try async<self.lock.loop.allocator> S.asyncFunc(&node, args); + node.* = Stack.Node{ + .next = undefined, + .data = handle, + }; + self.coro_stack.push(node); + } + + /// Wait for all the calls and promises of the group to complete. + /// Thread-safe. + pub async fn wait(self: *Self) ReturnType { + // TODO catch unreachable because the allocation can be grouped with + // the coro frame allocation + const held = await (async self.lock.acquire() catch unreachable); + defer held.release(); + + while (self.coro_stack.pop()) |node| { + if (Error == void) { + await node.data; + } else { + (await node.data) catch |err| { + self.cancelAll(); + return err; + }; + } + } + while (self.alloc_stack.pop()) |node| { + const handle = node.data; + self.lock.loop.allocator.destroy(node); + if (Error == void) { + await handle; + } else { + (await handle) catch |err| { + self.cancelAll(); + return err; + }; + } + } + } + + /// Cancel all the outstanding promises. May only be called if wait was never called. + pub fn cancelAll(self: *Self) void { + while (self.coro_stack.pop()) |node| { + cancel node.data; + } + while (self.alloc_stack.pop()) |node| { + cancel node.data; + self.lock.loop.allocator.destroy(node); + } + } + }; +} + +test "std.event.Group" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + const handle = try async<allocator> testGroup(&loop); + defer cancel handle; + + loop.run(); +} + +async fn testGroup(loop: *Loop) void { + var count: usize = 0; + var group = Group(void).init(loop); + group.add(async sleepALittle(&count) catch @panic("memory")) catch @panic("memory"); + group.call(increaseByTen, &count) catch @panic("memory"); + await (async group.wait() catch @panic("memory")); + assert(count == 11); + + var another = Group(error!void).init(loop); + another.add(async somethingElse() catch @panic("memory")) catch @panic("memory"); + another.call(doSomethingThatFails) catch @panic("memory"); + std.debug.assertError(await (async another.wait() catch @panic("memory")), error.ItBroke); +} + +async fn sleepALittle(count: *usize) void { + std.os.time.sleep(0, 1000000); + _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); +} + +async fn increaseByTen(count: *usize) void { + var i: usize = 0; + while (i < 10) : (i += 1) { + _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + } +} + +async fn doSomethingThatFails() error!void {} +async fn somethingElse() error!void { + return error.ItBroke; +} diff --git a/std/event/lock.zig b/std/event/lock.zig index 2a8d5ada77..2013b5595f 100644 --- a/std/event/lock.zig +++ b/std/event/lock.zig @@ -15,7 +15,7 @@ pub const Lock = struct { queue: Queue, queue_empty_bit: u8, // TODO make this a bool - const Queue = std.atomic.QueueMpsc(promise); + const Queue = std.atomic.Queue(promise); pub const Held = struct { lock: *Lock, @@ -73,6 +73,15 @@ pub const Lock = struct { }; } + pub fn initLocked(loop: *Loop) Lock { + return Lock{ + .loop = loop, + .shared_bit = 1, + .queue = Queue.init(), + .queue_empty_bit = 1, + }; + } + /// Must be called when not locked. Not thread safe. /// All calls to acquire() and release() must complete before calling deinit(). pub fn deinit(self: *Lock) void { @@ -81,7 +90,7 @@ pub const Lock = struct { } pub async fn acquire(self: *Lock) Held { - s: suspend |handle| { + suspend |handle| { // TODO explicitly put this memory in the coroutine frame #1194 var my_tick_node = Loop.NextTickNode{ .data = handle, diff --git a/std/event/loop.zig b/std/event/loop.zig index 646f15875f..fc927592b9 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -9,7 +9,7 @@ const AtomicOrder = builtin.AtomicOrder; pub const Loop = struct { allocator: *mem.Allocator, - next_tick_queue: std.atomic.QueueMpsc(promise), + next_tick_queue: std.atomic.Queue(promise), os_data: OsData, final_resume_node: ResumeNode, dispatch_lock: u8, // TODO make this a bool @@ -21,7 +21,7 @@ pub const Loop = struct { available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, - pub const NextTickNode = std.atomic.QueueMpsc(promise).Node; + pub const NextTickNode = std.atomic.Queue(promise).Node; pub const ResumeNode = struct { id: Id, @@ -77,7 +77,7 @@ pub const Loop = struct { .pending_event_count = 0, .allocator = allocator, .os_data = undefined, - .next_tick_queue = std.atomic.QueueMpsc(promise).init(), + .next_tick_queue = std.atomic.Queue(promise).init(), .dispatch_lock = 1, // start locked so threads go directly into epoll wait .extra_threads = undefined, .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), @@ -101,7 +101,6 @@ pub const Loop = struct { errdefer self.deinitOsData(); } - /// must call stop before deinit pub fn deinit(self: *Loop) void { self.deinitOsData(); self.allocator.free(self.extra_threads); @@ -382,6 +381,21 @@ pub const Loop = struct { return async<self.allocator> S.asyncFunc(self, &handle, args); } + /// Awaiting a yield lets the event loop run, starting any unstarted async operations. + /// Note that async operations automatically start when a function yields for any other reason, + /// for example, when async I/O is performed. This function is intended to be used only when + /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O + /// is performed. + pub async fn yield(self: *Loop) void { + suspend |p| { + var my_tick_node = Loop.NextTickNode{ + .next = undefined, + .data = p, + }; + loop.onNextTick(&my_tick_node); + } + } + fn workerRun(self: *Loop) void { start_over: while (true) { if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) { |
