diff options
| author | Andrew Kelley <superjoe30@gmail.com> | 2018-07-14 18:27:51 -0400 |
|---|---|---|
| committer | Andrew Kelley <superjoe30@gmail.com> | 2018-07-14 18:27:51 -0400 |
| commit | 4d920cee6e8be2f2ae2cfd9067358c65b977568a (patch) | |
| tree | 2c04de6151b7448dec9958d0a91234ea0ba9a15d /std/event/future.zig | |
| parent | da3acacc14331a6be33445c3bfd204e2cccabddd (diff) | |
| parent | 28c3d4809bc6d497ac81892bc7eb03b95d8c2b32 (diff) | |
| download | zig-4d920cee6e8be2f2ae2cfd9067358c65b977568a.tar.gz zig-4d920cee6e8be2f2ae2cfd9067358c65b977568a.zip | |
Merge remote-tracking branch 'origin/master' into llvm7
Diffstat (limited to 'std/event/future.zig')
| -rw-r--r-- | std/event/future.zig | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/std/event/future.zig b/std/event/future.zig new file mode 100644 index 0000000000..23fa570c8f --- /dev/null +++ b/std/event/future.zig @@ -0,0 +1,97 @@ +const std = @import("../index.zig"); +const assert = std.debug.assert; +const builtin = @import("builtin"); +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const Lock = std.event.Lock; +const Loop = std.event.Loop; + +/// This is a value that starts out unavailable, until a value is put(). +/// While it is unavailable, coroutines suspend when they try to get() it, +/// and then are resumed when the value is put(). +/// At this point the value remains forever available, and another put() is not allowed. +pub fn Future(comptime T: type) type { + return struct { + lock: Lock, + data: T, + available: u8, // TODO make this a bool + + const Self = this; + const Queue = std.atomic.Queue(promise); + + pub fn init(loop: *Loop) Self { + return Self{ + .lock = Lock.initLocked(loop), + .available = 0, + .data = undefined, + }; + } + + /// Obtain the value. If it's not available, wait until it becomes + /// available. + /// Thread-safe. + pub async fn get(self: *Self) *T { + if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 1) { + return &self.data; + } + const held = await (async self.lock.acquire() catch unreachable); + held.release(); + + return &self.data; + } + + /// Make the data become available. May be called only once. + /// Before calling this, modify the `data` property. + pub fn resolve(self: *Self) void { + const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); + assert(prev == 0); // put() called twice + Lock.Held.release(Lock.Held{ .lock = &self.lock }); + } + }; +} + +test "std.event.Future" { + var da = std.heap.DirectAllocator.init(); + defer da.deinit(); + + const allocator = &da.allocator; + + var loop: Loop = undefined; + try loop.initMultiThreaded(allocator); + defer loop.deinit(); + + const handle = try async<allocator> testFuture(&loop); + defer cancel handle; + + loop.run(); +} + +async fn testFuture(loop: *Loop) void { + suspend |p| { + resume p; + } + var future = Future(i32).init(loop); + + const a = async waitOnFuture(&future) catch @panic("memory"); + const b = async waitOnFuture(&future) catch @panic("memory"); + const c = async resolveFuture(&future) catch @panic("memory"); + + const result = (await a) + (await b); + cancel c; + assert(result == 12); +} + +async fn waitOnFuture(future: *Future(i32)) i32 { + suspend |p| { + resume p; + } + return (await (async future.get() catch @panic("memory"))).*; +} + +async fn resolveFuture(future: *Future(i32)) void { + suspend |p| { + resume p; + } + future.data = 6; + future.resolve(); +} |
