aboutsummaryrefslogtreecommitdiff
path: root/std
diff options
context:
space:
mode:
Diffstat (limited to 'std')
-rw-r--r--std/event.zig2
-rw-r--r--std/event/future.zig87
-rw-r--r--std/event/lock.zig11
3 files changed, 99 insertions, 1 deletions
diff --git a/std/event.zig b/std/event.zig
index 516defebf8..f3913a432b 100644
--- a/std/event.zig
+++ b/std/event.zig
@@ -4,6 +4,7 @@ pub const Lock = @import("event/lock.zig").Lock;
pub const tcp = @import("event/tcp.zig");
pub const Channel = @import("event/channel.zig").Channel;
pub const Group = @import("event/group.zig").Group;
+pub const Future = @import("event/future.zig").Group;
test "import event tests" {
_ = @import("event/locked.zig");
@@ -12,4 +13,5 @@ test "import event tests" {
_ = @import("event/tcp.zig");
_ = @import("event/channel.zig");
_ = @import("event/group.zig");
+ _ = @import("event/future.zig");
}
diff --git a/std/event/future.zig b/std/event/future.zig
new file mode 100644
index 0000000000..8001f675a2
--- /dev/null
+++ b/std/event/future.zig
@@ -0,0 +1,87 @@
+const std = @import("../index.zig");
+const assert = std.debug.assert;
+const builtin = @import("builtin");
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+
+/// This is a value that starts out unavailable, until a value is put().
+/// While it is unavailable, coroutines suspend when they try to get() it,
+/// and then are resumed when the value is put().
+/// At this point the value remains forever available, and another put() is not allowed.
+pub fn Future(comptime T: type) type {
+ return struct {
+ lock: Lock,
+ data: T,
+ available: u8, // TODO make this a bool
+
+ const Self = this;
+ const Queue = std.atomic.QueueMpsc(promise);
+
+ pub fn init(loop: *Loop) Self {
+ return Self{
+ .lock = Lock.initLocked(loop),
+ .available = 0,
+ .data = undefined,
+ };
+ }
+
+ /// Obtain the value. If it's not available, wait until it becomes
+ /// available.
+ /// Thread-safe.
+ pub async fn get(self: *Self) T {
+ if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 1) {
+ return self.data;
+ }
+ const held = await (async self.lock.acquire() catch unreachable);
+ defer held.release();
+
+ return self.data;
+ }
+
+ /// Make the data become available. May be called only once.
+ pub fn put(self: *Self, value: T) void {
+ self.data = value;
+ const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ assert(prev == 0); // put() called twice
+ Lock.Held.release(Lock.Held{ .lock = &self.lock });
+ }
+ };
+}
+
+test "std.event.Future" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ const handle = try async<allocator> testFuture(&loop);
+ defer cancel handle;
+
+ loop.run();
+}
+
+async fn testFuture(loop: *Loop) void {
+ var future = Future(i32).init(loop);
+
+ const a = async waitOnFuture(&future) catch @panic("memory");
+ const b = async waitOnFuture(&future) catch @panic("memory");
+ const c = async resolveFuture(&future) catch @panic("memory");
+
+ const result = (await a) + (await b);
+ cancel c;
+ assert(result == 12);
+}
+
+async fn waitOnFuture(future: *Future(i32)) i32 {
+ return await (async future.get() catch @panic("memory"));
+}
+
+async fn resolveFuture(future: *Future(i32)) void {
+ future.put(6);
+}
diff --git a/std/event/lock.zig b/std/event/lock.zig
index 2a8d5ada77..cba3594b50 100644
--- a/std/event/lock.zig
+++ b/std/event/lock.zig
@@ -73,6 +73,15 @@ pub const Lock = struct {
};
}
+ pub fn initLocked(loop: *Loop) Lock {
+ return Lock{
+ .loop = loop,
+ .shared_bit = 1,
+ .queue = Queue.init(),
+ .queue_empty_bit = 1,
+ };
+ }
+
/// Must be called when not locked. Not thread safe.
/// All calls to acquire() and release() must complete before calling deinit().
pub fn deinit(self: *Lock) void {
@@ -81,7 +90,7 @@ pub const Lock = struct {
}
pub async fn acquire(self: *Lock) Held {
- s: suspend |handle| {
+ suspend |handle| {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = handle,